Skip to content

Commit c129972

Browse files
committed
[SPARK-51474][SQL] Don't insert redundant ColumnarToRowExec for node supporting both columnar and row output
### What changes were proposed in this pull request? This patch fixes a corner case in `ApplyColumnarRulesAndInsertTransitions`. When a plan required to output rows, if the node supports both columnar and row output, the rule currently adds a redundant `ColumnarToRowExec` to its upstream. ### Why are the changes needed? This fix is used to avoid redundant `ColumnarToRowExec`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #50239 from viirya/fix_columnar. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent 84f5fd9 commit c129972

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,10 @@ case class ApplyColumnarRulesAndInsertTransitions(
533533
case write: DataWritingCommandExec
534534
if write.cmd.isInstanceOf[V1WriteCommand] && conf.plannedWriteEnabled =>
535535
write.child.supportsColumnar
536+
// If it is not required to output columnar (`outputsColumnar` is false), and the plan
537+
// supports row-based and columnar, we don't need to output row-based data on its children
538+
// nodes. So we set `outputsColumnar` to true.
539+
case _ if plan.supportsColumnar && plan.supportsRowBased => true
536540
case _ =>
537541
false
538542
}

sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.expressions.Attribute
2424
import org.apache.spark.sql.catalyst.plans.PlanTest
2525
import org.apache.spark.sql.test.SharedSparkSession
26+
import org.apache.spark.sql.vectorized.ColumnarBatch
2627

2728
class ColumnarRulesSuite extends PlanTest with SharedSparkSession {
2829

@@ -51,6 +52,15 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession {
5152
val appliedTwice = rules.apply(appliedOnce)
5253
assert(appliedTwice == expected)
5354
}
55+
56+
test("SPARK-51474: Don't insert redundant ColumnarToRowExec") {
57+
val rules = ApplyColumnarRulesAndInsertTransitions(
58+
spark.sessionState.columnarRules, false)
59+
60+
val plan = CanDoColumnarAndRowOp(UnaryOp(LeafOp(true), true))
61+
val appliedOnce = rules.apply(plan)
62+
assert(appliedOnce == plan)
63+
}
5464
}
5565

5666
case class LeafOp(override val supportsColumnar: Boolean) extends LeafExecNode {
@@ -63,3 +73,15 @@ case class UnaryOp(child: SparkPlan, override val supportsColumnar: Boolean) ext
6373
override def output: Seq[Attribute] = child.output
6474
override protected def withNewChildInternal(newChild: SparkPlan): UnaryOp = copy(child = newChild)
6575
}
76+
77+
case class CanDoColumnarAndRowOp(child: SparkPlan) extends UnaryExecNode {
78+
override val supportsRowBased: Boolean = true
79+
override val supportsColumnar: Boolean = true
80+
81+
override protected def doExecute(): RDD[InternalRow] = throw SparkUnsupportedOperationException()
82+
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
83+
throw SparkUnsupportedOperationException()
84+
override def output: Seq[Attribute] = child.output
85+
override protected def withNewChildInternal(newChild: SparkPlan): CanDoColumnarAndRowOp =
86+
copy(child = newChild)
87+
}

0 commit comments

Comments
 (0)