Skip to content

Commit 1369ff5

Browse files
author
Justin Uang
committed
blah
1 parent 8377468 commit 1369ff5

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
21+
import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder}
22+
import org.apache.spark.sql.catalyst.plans.Inner
23+
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
24+
import org.apache.spark.sql.execution.{RangeExec, SortExec}
25+
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
26+
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
27+
import org.apache.spark.sql.internal.SQLConf
28+
import org.apache.spark.sql.test.SharedSQLContext
29+
30+
class PlanQueryStageTest extends SharedSQLContext {
31+
32+
test("Replaces ShuffleExchangeExec/BroadcastExchangeExec with reuse disabled") {
33+
val range = org.apache.spark.sql.catalyst.plans.logical.Range(1, 100, 1, 1)
34+
val originalPlan = ShuffleExchangeExec(
35+
HashPartitioning(Seq(UnresolvedAttribute("blah")), 100),
36+
RangeExec(range))
37+
38+
val conf = new SQLConf
39+
conf.setConfString("spark.sql.exchange.reuse", "false")
40+
val planQueryStage = PlanQueryStage(conf)
41+
val newPlan = planQueryStage(originalPlan)
42+
43+
val expectedPlan = ResultQueryStage(
44+
ShuffleQueryStageInput(
45+
ShuffleQueryStage(originalPlan),
46+
range.output))
47+
48+
assert(newPlan == expectedPlan)
49+
}
50+
51+
test("Reuses ShuffleQueryStage when possible") {
52+
val conf = new SQLConf
53+
conf.setConfString("spark.sql.exchange.reuse", "true")
54+
55+
val planQueryStage = PlanQueryStage(conf)
56+
val newPlan = planQueryStage(createMergeJoinPlan(100, 100))
57+
58+
val collected = newPlan.collect {
59+
case e: ShuffleQueryStageInput => e.childStage
60+
}
61+
62+
assert(collected.length == 2)
63+
assert(collected(0).eq(collected(1)))
64+
}
65+
66+
test("Creates multiple ShuffleQueryStages when stages are different") {
67+
val conf = new SQLConf
68+
conf.setConfString("spark.sql.exchange.reuse", "true")
69+
70+
val planQueryStage = PlanQueryStage(conf)
71+
val newPlan = planQueryStage(createMergeJoinPlan(100, 101))
72+
73+
val collected = newPlan.collect {
74+
case e: ShuffleQueryStageInput => e.childStage
75+
}
76+
77+
assert(collected.length == 2)
78+
assert(!collected(0).eq(collected(1)))
79+
}
80+
81+
def createMergeJoinPlan(leftNum: Int, rightNum: Int): SortMergeJoinExec = {
82+
val left = SortExec(
83+
Seq(SortOrder(UnresolvedAttribute("blah"), Ascending)),
84+
true,
85+
ShuffleExchangeExec(
86+
HashPartitioning(Seq(UnresolvedAttribute("blah")), 100),
87+
RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(1, leftNum, 1, 1))))
88+
89+
val right = SortExec(
90+
Seq(SortOrder(UnresolvedAttribute("blah"), Ascending)),
91+
true,
92+
ShuffleExchangeExec(
93+
HashPartitioning(Seq(UnresolvedAttribute("blah")), 100),
94+
RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(1, rightNum, 1, 1))))
95+
96+
SortMergeJoinExec(
97+
Seq(UnresolvedAttribute("blah")),
98+
Seq(UnresolvedAttribute("blah")),
99+
Inner,
100+
None,
101+
left,
102+
right)
103+
}
104+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.apache.spark.sql.execution.adaptive
2+
3+
class QueryStageTest {
4+
5+
}

0 commit comments

Comments
 (0)