Skip to content

Commit a45133b

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-21743][SQL] top-most limit should not cause memory leak
## What changes were proposed in this pull request? For top-most limit, we will use a special operator to execute it: `CollectLimitExec`. `CollectLimitExec` will retrieve `n`(which is the limit) rows from each partition of the child plan output, see https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L311. It's very likely that we don't exhaust the child plan output. This is fine when whole-stage-codegen is off, as child plan will release the resource via task completion listener. However, when whole-stage codegen is on, the resource can only be released if all output is consumed. To fix this memory leak, one simple approach is, when `CollectLimitExec` retrieve `n` rows from child plan output, child plan output should only have `n` rows, then the output is exhausted and resource is released. This can be done by wrapping child plan with `LocalLimit` ## How was this patch tested? a regression test Author: Wenchen Fan <[email protected]> Closes apache#18955 from cloud-fan/leak.
1 parent b8ffb51 commit a45133b

File tree

3 files changed

+19
-1
lines changed

3 files changed

+19
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7272
execution.TakeOrderedAndProjectExec(
7373
limit, order, projectList, planLater(child)) :: Nil
7474
case logical.Limit(IntegerLiteral(limit), child) =>
75-
execution.CollectLimitExec(limit, planLater(child)) :: Nil
75+
// Normally wrapping child with `LocalLimitExec` here is a no-op, because
76+
// `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which
77+
// calls `child.executeTake`. If child supports whole stage codegen, adding this
78+
// `LocalLimitExec` can stop the processing of whole stage codegen and trigger the
79+
// resource releasing work, after we consume `limit` rows.
80+
execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
7681
case other => planLater(other) :: Nil
7782
}
7883
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
5454
val limit: Int
5555
override def output: Seq[Attribute] = child.output
5656

57+
// Do not enable whole stage codegen for a single limit.
58+
override def supportCodegen: Boolean = child match {
59+
case plan: CodegenSupport => plan.supportCodegen
60+
case _ => false
61+
}
62+
63+
override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))
64+
5765
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
5866
iter.take(limit)
5967
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2658,4 +2658,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
26582658
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
26592659
}
26602660
}
2661+
2662+
test("SPARK-21743: top-most limit should not cause memory leak") {
2663+
// In unit test, Spark will fail the query if memory leak detected.
2664+
spark.range(100).groupBy("id").count().limit(1).collect()
2665+
}
26612666
}

0 commit comments

Comments
 (0)