Skip to content

Commit 25892f3

Browse files
mgaido91cloud-fan
authored andcommitted
[SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
## What changes were proposed in this pull request? Added a new rule to remove Sort operation when its child is already sorted. For instance, this simple code: ``` spark.sparkContext.parallelize(Seq(("a", "b"))).toDF("a", "b").registerTempTable("table1") val df = sql(s"""SELECT b | FROM ( | SELECT a, b | FROM table1 | ORDER BY a | ) t | ORDER BY a""".stripMargin) df.explain(true) ``` before the PR produces this plan: ``` == Parsed Logical Plan == 'Sort ['a ASC NULLS FIRST], true +- 'Project ['b] +- 'SubqueryAlias t +- 'Sort ['a ASC NULLS FIRST], true +- 'Project ['a, 'b] +- 'UnresolvedRelation `table1` == Analyzed Logical Plan == b: string Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- SubqueryAlias t +- Sort [a#6 ASC NULLS FIRST], true +- Project [a#6, b#7] +- SubqueryAlias table1 +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- Sort [a#6 ASC NULLS FIRST], true +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Physical Plan == *(3) Project [b#7] +- *(3) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200) +- *(2) Project [b#7, a#6] +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200) +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7] +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- Scan ExternalRDDScan[obj#2] ``` while after the PR produces: ``` == Parsed Logical Plan == 'Sort ['a ASC NULLS FIRST], true +- 'Project ['b] +- 'SubqueryAlias t +- 'Sort ['a ASC NULLS FIRST], true +- 'Project ['a, 'b] +- 'UnresolvedRelation `table1` == Analyzed Logical Plan == b: string Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- SubqueryAlias t +- Sort [a#6 ASC NULLS FIRST], true +- Project [a#6, b#7] +- SubqueryAlias table1 +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Physical Plan == *(2) Project [b#7] +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 5) +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7] +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- Scan ExternalRDDScan[obj#2] ``` this means that an unnecessary sort operation is not performed after the PR. ## How was this patch tested? added UT Author: Marco Gaido <[email protected]> Closes apache#20560 from mgaido91/SPARK-23375.
1 parent 4dfd746 commit 25892f3

File tree

10 files changed

+175
-24
lines changed

10 files changed

+175
-24
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
138138
operatorOptimizationBatch) :+
139139
Batch("Join Reorder", Once,
140140
CostBasedJoinReorder) :+
141+
Batch("Remove Redundant Sorts", Once,
142+
RemoveRedundantSorts) :+
141143
Batch("Decimal Optimizations", fixedPoint,
142144
DecimalAggregates) :+
143145
Batch("Object Expressions Optimization", fixedPoint,
@@ -733,6 +735,16 @@ object EliminateSorts extends Rule[LogicalPlan] {
733735
}
734736
}
735737

738+
/**
739+
* Removes Sort operation if the child is already sorted
740+
*/
741+
object RemoveRedundantSorts extends Rule[LogicalPlan] {
742+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
743+
case Sort(orders, true, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
744+
child
745+
}
746+
}
747+
736748
/**
737749
* Removes filters that can be evaluated trivially. This can be done through the following ways:
738750
* 1) by eliding the filter for cases where it will always evaluate to `true`.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ abstract class LogicalPlan
219219
* Refreshes (or invalidates) any metadata/data cached in the plan recursively.
220220
*/
221221
def refresh(): Unit = children.foreach(_.refresh())
222+
223+
/**
224+
* Returns the output ordering that this plan generates.
225+
*/
226+
def outputOrdering: Seq[SortOrder] = Nil
222227
}
223228

224229
/**
@@ -274,3 +279,7 @@ abstract class BinaryNode extends LogicalPlan {
274279

275280
override final def children: Seq[LogicalPlan] = Seq(left, right)
276281
}
282+
283+
abstract class OrderPreservingUnaryNode extends UnaryNode {
284+
override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
285+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode {
4343
* This node is inserted at the top of a subquery when it is optimized. This makes sure we can
4444
* recognize a subquery as such, and it allows us to write subquery aware transformations.
4545
*/
46-
case class Subquery(child: LogicalPlan) extends UnaryNode {
46+
case class Subquery(child: LogicalPlan) extends OrderPreservingUnaryNode {
4747
override def output: Seq[Attribute] = child.output
4848
}
4949

50-
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
50+
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
51+
extends OrderPreservingUnaryNode {
5152
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
5253
override def maxRows: Option[Long] = child.maxRows
5354

@@ -125,7 +126,7 @@ case class Generate(
125126
}
126127

127128
case class Filter(condition: Expression, child: LogicalPlan)
128-
extends UnaryNode with PredicateHelper {
129+
extends OrderPreservingUnaryNode with PredicateHelper {
129130
override def output: Seq[Attribute] = child.output
130131

131132
override def maxRows: Option[Long] = child.maxRows
@@ -469,6 +470,7 @@ case class Sort(
469470
child: LogicalPlan) extends UnaryNode {
470471
override def output: Seq[Attribute] = child.output
471472
override def maxRows: Option[Long] = child.maxRows
473+
override def outputOrdering: Seq[SortOrder] = order
472474
}
473475

474476
/** Factory for constructing new `Range` nodes. */
@@ -522,6 +524,15 @@ case class Range(
522524
override def computeStats(): Statistics = {
523525
Statistics(sizeInBytes = LongType.defaultSize * numElements)
524526
}
527+
528+
override def outputOrdering: Seq[SortOrder] = {
529+
val order = if (step > 0) {
530+
Ascending
531+
} else {
532+
Descending
533+
}
534+
output.map(a => SortOrder(a, order))
535+
}
525536
}
526537

527538
case class Aggregate(
@@ -728,7 +739,7 @@ object Limit {
728739
*
729740
* See [[Limit]] for more information.
730741
*/
731-
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
742+
case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
732743
override def output: Seq[Attribute] = child.output
733744
override def maxRows: Option[Long] = {
734745
limitExpr match {
@@ -744,7 +755,7 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
744755
*
745756
* See [[Limit]] for more information.
746757
*/
747-
case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
758+
case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
748759
override def output: Seq[Attribute] = child.output
749760

750761
override def maxRowsPerPartition: Option[Long] = {
@@ -764,7 +775,7 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
764775
case class SubqueryAlias(
765776
alias: String,
766777
child: LogicalPlan)
767-
extends UnaryNode {
778+
extends OrderPreservingUnaryNode {
768779

769780
override def doCanonicalize(): LogicalPlan = child.canonicalized
770781

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry}
21+
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
22+
import org.apache.spark.sql.catalyst.dsl.expressions._
23+
import org.apache.spark.sql.catalyst.dsl.plans._
24+
import org.apache.spark.sql.catalyst.expressions._
25+
import org.apache.spark.sql.catalyst.plans._
26+
import org.apache.spark.sql.catalyst.plans.logical._
27+
import org.apache.spark.sql.catalyst.rules._
28+
import org.apache.spark.sql.internal.SQLConf
29+
import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, ORDER_BY_ORDINAL}
30+
31+
class RemoveRedundantSortsSuite extends PlanTest {
32+
33+
object Optimize extends RuleExecutor[LogicalPlan] {
34+
val batches =
35+
Batch("Remove Redundant Sorts", Once,
36+
RemoveRedundantSorts) ::
37+
Batch("Collapse Project", Once,
38+
CollapseProject) :: Nil
39+
}
40+
41+
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
42+
43+
test("remove redundant order by") {
44+
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
45+
val unnecessaryReordered = orderedPlan.select('a).orderBy('a.asc, 'b.desc_nullsFirst)
46+
val optimized = Optimize.execute(unnecessaryReordered.analyze)
47+
val correctAnswer = orderedPlan.select('a).analyze
48+
comparePlans(Optimize.execute(optimized), correctAnswer)
49+
}
50+
51+
test("do not remove sort if the order is different") {
52+
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
53+
val reorderedDifferently = orderedPlan.select('a).orderBy('a.asc, 'b.desc)
54+
val optimized = Optimize.execute(reorderedDifferently.analyze)
55+
val correctAnswer = reorderedDifferently.analyze
56+
comparePlans(optimized, correctAnswer)
57+
}
58+
59+
test("filters don't affect order") {
60+
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
61+
val filteredAndReordered = orderedPlan.where('a > Literal(10)).orderBy('a.asc, 'b.desc)
62+
val optimized = Optimize.execute(filteredAndReordered.analyze)
63+
val correctAnswer = orderedPlan.where('a > Literal(10)).analyze
64+
comparePlans(optimized, correctAnswer)
65+
}
66+
67+
test("limits don't affect order") {
68+
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
69+
val filteredAndReordered = orderedPlan.limit(Literal(10)).orderBy('a.asc, 'b.desc)
70+
val optimized = Optimize.execute(filteredAndReordered.analyze)
71+
val correctAnswer = orderedPlan.limit(Literal(10)).analyze
72+
comparePlans(optimized, correctAnswer)
73+
}
74+
75+
test("range is already sorted") {
76+
val inputPlan = Range(1L, 1000L, 1, 10)
77+
val orderedPlan = inputPlan.orderBy('id.asc)
78+
val optimized = Optimize.execute(orderedPlan.analyze)
79+
val correctAnswer = inputPlan.analyze
80+
comparePlans(optimized, correctAnswer)
81+
82+
val reversedPlan = inputPlan.orderBy('id.desc)
83+
val reversedOptimized = Optimize.execute(reversedPlan.analyze)
84+
val reversedCorrectAnswer = reversedPlan.analyze
85+
comparePlans(reversedOptimized, reversedCorrectAnswer)
86+
87+
val negativeStepInputPlan = Range(10L, 1L, -1, 10)
88+
val negativeStepOrderedPlan = negativeStepInputPlan.orderBy('id.desc)
89+
val negativeStepOptimized = Optimize.execute(negativeStepOrderedPlan.analyze)
90+
val negativeStepCorrectAnswer = negativeStepInputPlan.analyze
91+
comparePlans(negativeStepOptimized, negativeStepCorrectAnswer)
92+
}
93+
94+
test("sort should not be removed when there is a node which doesn't guarantee any order") {
95+
val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc)
96+
val groupedAndResorted = orderedPlan.groupBy('a)(sum('a)).orderBy('a.asc)
97+
val optimized = Optimize.execute(groupedAndResorted.analyze)
98+
val correctAnswer = groupedAndResorted.analyze
99+
comparePlans(optimized, correctAnswer)
100+
}
101+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class CacheManager extends Logging {
9999
sparkSession.sessionState.conf.columnBatchSize, storageLevel,
100100
sparkSession.sessionState.executePlan(planToCache).executedPlan,
101101
tableName,
102-
planToCache.stats)
102+
planToCache)
103103
cachedData.add(CachedData(planToCache, inMemoryRelation))
104104
}
105105
}
@@ -148,7 +148,7 @@ class CacheManager extends Logging {
148148
storageLevel = cd.cachedRepresentation.storageLevel,
149149
child = spark.sessionState.executePlan(cd.plan).executedPlan,
150150
tableName = cd.cachedRepresentation.tableName,
151-
statsOfPlanToCache = cd.plan.stats)
151+
logicalPlan = cd.plan)
152152
needToRecache += cd.copy(cachedRepresentation = newCache)
153153
}
154154
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ case class LogicalRDD(
125125
output: Seq[Attribute],
126126
rdd: RDD[InternalRow],
127127
outputPartitioning: Partitioning = UnknownPartitioning(0),
128-
outputOrdering: Seq[SortOrder] = Nil,
128+
override val outputOrdering: Seq[SortOrder] = Nil,
129129
override val isStreaming: Boolean = false)(session: SparkSession)
130130
extends LeafNode with MultiInstanceRelation {
131131

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2626
import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.plans.QueryPlan
2828
import org.apache.spark.sql.catalyst.plans.logical
29-
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, Statistics}
29+
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, Statistics}
3030
import org.apache.spark.sql.execution.SparkPlan
3131
import org.apache.spark.storage.StorageLevel
3232
import org.apache.spark.util.LongAccumulator
@@ -39,9 +39,9 @@ object InMemoryRelation {
3939
storageLevel: StorageLevel,
4040
child: SparkPlan,
4141
tableName: Option[String],
42-
statsOfPlanToCache: Statistics): InMemoryRelation =
42+
logicalPlan: LogicalPlan): InMemoryRelation =
4343
new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)(
44-
statsOfPlanToCache = statsOfPlanToCache)
44+
statsOfPlanToCache = logicalPlan.stats, outputOrdering = logicalPlan.outputOrdering)
4545
}
4646

4747

@@ -64,7 +64,8 @@ case class InMemoryRelation(
6464
tableName: Option[String])(
6565
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
6666
val sizeInBytesStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
67-
statsOfPlanToCache: Statistics)
67+
statsOfPlanToCache: Statistics,
68+
override val outputOrdering: Seq[SortOrder])
6869
extends logical.LeafNode with MultiInstanceRelation {
6970

7071
override protected def innerChildren: Seq[SparkPlan] = Seq(child)
@@ -76,7 +77,8 @@ case class InMemoryRelation(
7677
tableName = None)(
7778
_cachedColumnBuffers,
7879
sizeInBytesStats,
79-
statsOfPlanToCache)
80+
statsOfPlanToCache,
81+
outputOrdering)
8082

8183
override def producedAttributes: AttributeSet = outputSet
8284

@@ -159,7 +161,7 @@ case class InMemoryRelation(
159161
def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = {
160162
InMemoryRelation(
161163
newOutput, useCompression, batchSize, storageLevel, child, tableName)(
162-
_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache)
164+
_cachedColumnBuffers, sizeInBytesStats, statsOfPlanToCache, outputOrdering)
163165
}
164166

165167
override def newInstance(): this.type = {
@@ -172,7 +174,8 @@ case class InMemoryRelation(
172174
tableName)(
173175
_cachedColumnBuffers,
174176
sizeInBytesStats,
175-
statsOfPlanToCache).asInstanceOf[this.type]
177+
statsOfPlanToCache,
178+
outputOrdering).asInstanceOf[this.type]
176179
}
177180

178181
def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers

sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext {
3939
def computeChiSquareTest(): Double = {
4040
val n = 10000
4141
// Trigger a sort
42-
val data = spark.range(0, n, 1, 1).sort('id)
42+
val data = spark.range(0, n, 1, 1).sort('id.desc)
4343
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()
4444

4545
// Compute histogram for the number of records per partition post sort

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{execution, Row}
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
25-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
25+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort}
2626
import org.apache.spark.sql.catalyst.plans.physical._
2727
import org.apache.spark.sql.execution.columnar.InMemoryRelation
2828
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
@@ -197,6 +197,19 @@ class PlannerSuite extends SharedSQLContext {
197197
assert(planned.child.isInstanceOf[CollectLimitExec])
198198
}
199199

200+
test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") {
201+
val query = testData.select('key, 'value).sort('key.desc).cache()
202+
assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation])
203+
val resorted = query.sort('key.desc)
204+
assert(resorted.queryExecution.optimizedPlan.collect { case s: Sort => s}.isEmpty)
205+
assert(resorted.select('key).collect().map(_.getInt(0)).toSeq ==
206+
(1 to 100).reverse)
207+
// with a different order, the sort is needed
208+
val sortedAsc = query.sort('key)
209+
assert(sortedAsc.queryExecution.optimizedPlan.collect { case s: Sort => s}.size == 1)
210+
assert(sortedAsc.select('key).collect().map(_.getInt(0)).toSeq == (1 to 100))
211+
}
212+
200213
test("PartitioningCollection") {
201214
withTempView("normal", "small", "tiny") {
202215
testData.createOrReplaceTempView("normal")

0 commit comments

Comments
 (0)