Skip to content

Commit 5a4cc08

Browse files
author
Justin Uang
committed
blah
1 parent 1369ff5 commit 5a4cc08

File tree

1 file changed

+59
-1
lines changed

1 file changed

+59
-1
lines changed
Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.spark.sql.execution.adaptive
219

3-
class QueryStageTest {
20+
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
21+
import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder}
22+
import org.apache.spark.sql.catalyst.plans.Inner
23+
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
24+
import org.apache.spark.sql.execution.{RangeExec, SortExec}
25+
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
26+
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
27+
import org.apache.spark.sql.internal.SQLConf
28+
import org.apache.spark.sql.test.SharedSQLContext
29+
30+
class QueryStageTest extends SharedSQLContext {
31+
32+
test("Replaces ShuffleExchangeExec/BroadcastExchangeExec with reuse disabled") {
33+
val plan = createMergeJoinPlan(100, 100)
34+
35+
val resultQueryStage = ResultQueryStage(plan)
36+
37+
resultQueryStage.execute()
38+
}
39+
40+
def createMergeJoinPlan(leftNum: Int, rightNum: Int): SortMergeJoinExec = {
41+
val left = SortExec(
42+
Seq(SortOrder(UnresolvedAttribute("blah"), Ascending)),
43+
true,
44+
ShuffleExchangeExec(
45+
HashPartitioning(Seq(UnresolvedAttribute("blah")), 100),
46+
RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(1, leftNum, 1, 1))))
47+
48+
val right = SortExec(
49+
Seq(SortOrder(UnresolvedAttribute("blah"), Ascending)),
50+
true,
51+
ShuffleExchangeExec(
52+
HashPartitioning(Seq(UnresolvedAttribute("blah")), 100),
53+
RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(1, rightNum, 1, 1))))
454

55+
SortMergeJoinExec(
56+
Seq(UnresolvedAttribute("blah")),
57+
Seq(UnresolvedAttribute("blah")),
58+
Inner,
59+
None,
60+
left,
61+
right)
62+
}
563
}

0 commit comments

Comments
 (0)