Skip to content

Commit 4704af4

Browse files
hvanhovellcloud-fan
authored andcommitted
[SPARK-27449] Move WholeStageCodegen.limitNotReachedCond class checks into separate methods.
## What changes were proposed in this pull request? This PR moves the checks done in `WholeStageCodegen.limitNotReachedCond` into a separate protected method. This makes it easier to introduce new leaf or blocking nodes. ## How was this patch tested? Existing tests. Closes apache#24358 from hvanhovell/SPARK-27449. Authored-by: herman <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 67bd124 commit 4704af4

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,14 +381,17 @@ trait CodegenSupport extends SparkPlan {
381381
*/
382382
def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
383383

384+
/**
385+
* Check if the node is supposed to produce limit not reached checks.
386+
*/
387+
protected def canCheckLimitNotReached: Boolean = children.isEmpty
388+
384389
/**
385390
* A helper method to generate the data producing loop condition according to the
386391
* limit-not-reached checks.
387392
*/
388393
final def limitNotReachedCond: String = {
389-
// InputAdapter is also a leaf node.
390-
val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
391-
if (!isLeafNode && !this.isInstanceOf[BlockingOperatorWithCodegen]) {
394+
if (!canCheckLimitNotReached) {
392395
val errMsg = "Only leaf nodes and blocking nodes need to call 'limitNotReachedCond' " +
393396
"in its data producing loop."
394397
if (Utils.isTesting) {
@@ -426,6 +429,9 @@ trait BlockingOperatorWithCodegen extends CodegenSupport {
426429
// that upstream operators will not generate useless conditions (which are always evaluated to
427430
// false) for the Limit operators after this blocking operator.
428431
override def limitNotReachedChecks: Seq[String] = Nil
432+
433+
// This is a blocking node so the node can produce these checks
434+
override protected def canCheckLimitNotReached: Boolean = true
429435
}
430436

431437
/**
@@ -500,6 +506,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod
500506

501507
override def inputRDD: RDD[InternalRow] = child.execute()
502508

509+
// This is a leaf node so the node can produce limit not reached checks.
510+
override protected def canCheckLimitNotReached: Boolean = true
511+
503512
// InputAdapter does not need UnsafeProjection.
504513
protected val createUnsafeProjection: Boolean = false
505514

0 commit comments

Comments
 (0)