Skip to content

Commit 10f9e42

Browse files
committed
fix "DPP with native_datafusion scan - join with dynamic partition pruning"
1 parent 288a248 commit 10f9e42

File tree

1 file changed

+20
-5
lines changed

1 file changed

+20
-5
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package org.apache.spark.sql.comet
2222
import org.apache.spark.rdd.RDD
2323
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2424
import org.apache.spark.sql.catalyst.expressions._
25+
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
2526
import org.apache.spark.sql.catalyst.plans.QueryPlan
2627
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
2728
import org.apache.spark.sql.comet.shims.ShimCometScanExec
@@ -85,7 +86,11 @@ case class CometNativeScanExec(
8586
* Prepare DPP subquery plans. Called by Spark's prepare() before doExecuteColumnar().
8687
*/
8788
override protected def doPrepare(): Unit = {
88-
partitionFilters.foreach {
89+
// Use originalPlan.partitionFilters (not partitionFilters) because AQE's
90+
// PlanDynamicPruningFilters may transform InSubqueryExec → Literal.TrueLiteral
91+
// via makeCopy, but originalPlan is not in the active plan tree so it retains
92+
// the original InSubqueryExec needed for DPP preparation.
93+
originalPlan.partitionFilters.foreach {
8994
case DynamicPruningExpression(e: InSubqueryExec) =>
9095
e.plan.prepare()
9196
case _ =>
@@ -97,8 +102,12 @@ case class CometNativeScanExec(
97102
* Lazy partition serialization - deferred until execution time for DPP support.
98103
*/
99104
@transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = {
100-
// Wait for DPP subqueries to resolve before accessing partitions
101-
partitionFilters.foreach {
105+
// Wait for DPP subqueries to resolve before accessing partitions.
106+
// Use originalPlan.partitionFilters (not partitionFilters) because AQE's
107+
// PlanDynamicPruningFilters may transform InSubqueryExec → Literal.TrueLiteral
108+
// via makeCopy, but originalPlan is not in the active plan tree so it retains
109+
// the original InSubqueryExec needed for DPP resolution.
110+
originalPlan.partitionFilters.foreach {
102111
case DynamicPruningExpression(e: InSubqueryExec) if e.values().isEmpty =>
103112
e.plan match {
104113
case sab: SubqueryAdaptiveBroadcastExec =>
@@ -118,16 +127,22 @@ case class CometNativeScanExec(
118127

119128
/** Get file partitions with DPP filtering applied. */
120129
private def getFilePartitions(): Seq[FilePartition] = {
130+
// Use originalPlan.partitionFilters (not partitionFilters) because AQE's
131+
// PlanDynamicPruningFilters may transform InSubqueryExec → Literal.TrueLiteral
132+
// via makeCopy, but originalPlan is not in the active plan tree so it retains
133+
// the original InSubqueryExec needed for DPP filtering.
134+
val dppFilters = originalPlan.partitionFilters
135+
121136
if (bucketedScan) {
122137
getDppFilteredBucketedFilePartitions(
123138
relation,
124-
partitionFilters,
139+
dppFilters,
125140
originalPlan.selectedPartitions,
126141
relation.bucketSpec.get,
127142
optionalBucketSet,
128143
optionalNumCoalescedBuckets)
129144
} else {
130-
getDppFilteredFilePartitions(relation, partitionFilters, originalPlan.selectedPartitions)
145+
getDppFilteredFilePartitions(relation, dppFilters, originalPlan.selectedPartitions)
131146
}
132147
}
133148

0 commit comments

Comments
 (0)