Skip to content

Commit fdaec64

Browse files
authored
chore: refactor v2 scan conversion (#1621)
1 parent 5a36451 commit fdaec64

File tree

2 files changed

+61
-87
lines changed

2 files changed

+61
-87
lines changed

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 60 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}
5656

5757
import org.apache.comet.CometConf._
5858
import org.apache.comet.CometExplainInfo.getActualPlan
59-
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
59+
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
6060
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
6161
import org.apache.comet.rules.RewriteJoin
6262
import org.apache.comet.serde.OperatorOuterClass.Operator
@@ -114,78 +114,14 @@ class CometSparkSessionExtensions
114114
plan.transform {
115115
case scan if hasMetadataCol(scan) =>
116116
withInfo(scan, "Metadata column is not supported")
117-
scan
118117

119118
// data source V1
120119
case scanExec: FileSourceScanExec =>
121120
transformV1Scan(scanExec)
122121

123122
// data source V2
124-
case scanExec: BatchScanExec
125-
if scanExec.scan.isInstanceOf[ParquetScan] &&
126-
CometBatchScanExec.isSchemaSupported(
127-
scanExec.scan.asInstanceOf[ParquetScan].readDataSchema) &&
128-
CometBatchScanExec.isSchemaSupported(
129-
scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema) &&
130-
// Comet does not support pushedAggregate
131-
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isEmpty =>
132-
val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
133-
logInfo("Comet extension enabled for Scan")
134-
CometBatchScanExec(
135-
scanExec.copy(scan = cometScan),
136-
runtimeFilters = scanExec.runtimeFilters)
137-
138-
// If it is a `ParquetScan` but unsupported by Comet, attach the exact
139-
// reason to the plan.
140-
case scanExec: BatchScanExec if scanExec.scan.isInstanceOf[ParquetScan] =>
141-
val requiredSchema = scanExec.scan.asInstanceOf[ParquetScan].readDataSchema
142-
val info1 = createMessage(
143-
!CometBatchScanExec.isSchemaSupported(requiredSchema),
144-
s"Schema $requiredSchema is not supported")
145-
val readPartitionSchema = scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema
146-
val info2 = createMessage(
147-
!CometBatchScanExec.isSchemaSupported(readPartitionSchema),
148-
s"Partition schema $readPartitionSchema is not supported")
149-
// Comet does not support pushedAggregate
150-
val info3 = createMessage(
151-
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isDefined,
152-
"Comet does not support pushed aggregate")
153-
withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet)
154-
scanExec
155-
156-
// Other datasource V2 scan
157123
case scanExec: BatchScanExec =>
158-
scanExec.scan match {
159-
// Iceberg scan, supported cases
160-
case s: SupportsComet
161-
if s.isCometEnabled &&
162-
CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema()) =>
163-
logInfo(s"Comet extension enabled for ${scanExec.scan.getClass.getSimpleName}")
164-
// When reading from Iceberg, we automatically enable type promotion
165-
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
166-
CometBatchScanExec(
167-
scanExec.clone().asInstanceOf[BatchScanExec],
168-
runtimeFilters = scanExec.runtimeFilters)
169-
170-
// Iceberg scan but disabled or unsupported by Comet
171-
case s: SupportsComet =>
172-
val info1 = createMessage(
173-
!s.isCometEnabled,
174-
"Comet extension is not enabled for " +
175-
s"${scanExec.scan.getClass.getSimpleName}: not enabled on data source side")
176-
val info2 = createMessage(
177-
!CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema()),
178-
"Comet extension is not enabled for " +
179-
s"${scanExec.scan.getClass.getSimpleName}: Schema not supported")
180-
withInfos(scanExec, Seq(info1, info2).flatten.toSet)
181-
182-
// If it is data source V2 other than Parquet or Iceberg,
183-
// attach the unsupported reason to the plan.
184-
case _ =>
185-
withInfo(scanExec, "Comet Scan only supports Parquet")
186-
scanExec
187-
}
188-
124+
transformV2Scan(scanExec)
189125
}
190126
}
191127
}
@@ -197,8 +133,7 @@ class CometSparkSessionExtensions
197133

198134
if (COMET_DPP_FALLBACK_ENABLED.get() &&
199135
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
200-
withInfo(scanExec, "DPP not supported")
201-
return scanExec
136+
return withInfo(scanExec, "Dynamic Partition Pruning is not supported")
202137
}
203138

204139
scanExec.relation match {
@@ -236,16 +171,70 @@ class CometSparkSessionExtensions
236171
CometScanExec(scanExec, session)
237172
} else {
238173
withInfo(scanExec, fallbackReasons.mkString(", "))
239-
scanExec
240174
}
241175

242176
case _ =>
243177
withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
244-
scanExec
245178
}
246179
}
247180
}
248181

182+
private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {
183+
184+
scanExec.scan match {
185+
case scan: ParquetScan =>
186+
val fallbackReasons = new ListBuffer[String]()
187+
if (!CometBatchScanExec.isSchemaSupported(scan.readDataSchema)) {
188+
fallbackReasons += s"Schema ${scan.readDataSchema} is not supported"
189+
}
190+
191+
if (!CometBatchScanExec.isSchemaSupported(scan.readPartitionSchema)) {
192+
fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported"
193+
}
194+
195+
if (scan.pushedAggregate.nonEmpty) {
196+
fallbackReasons += "Comet does not support pushed aggregate"
197+
}
198+
199+
if (fallbackReasons.isEmpty) {
200+
val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
201+
logInfo("Comet extension enabled for Scan")
202+
CometBatchScanExec(
203+
scanExec.copy(scan = cometScan),
204+
runtimeFilters = scanExec.runtimeFilters)
205+
} else {
206+
withInfo(scanExec, fallbackReasons.mkString(", "))
207+
}
208+
209+
// Iceberg scan
210+
case s: SupportsComet =>
211+
val fallbackReasons = new ListBuffer[String]()
212+
213+
if (!s.isCometEnabled) {
214+
fallbackReasons += "Comet extension is not enabled for " +
215+
s"${scanExec.scan.getClass.getSimpleName}: not enabled on data source side"
216+
}
217+
218+
if (!CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema())) {
219+
fallbackReasons += "Comet extension is not enabled for " +
220+
s"${scanExec.scan.getClass.getSimpleName}: Schema not supported"
221+
}
222+
223+
if (fallbackReasons.isEmpty) {
224+
// When reading from Iceberg, we automatically enable type promotion
225+
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
226+
CometBatchScanExec(
227+
scanExec.clone().asInstanceOf[BatchScanExec],
228+
runtimeFilters = scanExec.runtimeFilters)
229+
} else {
230+
withInfo(scanExec, fallbackReasons.mkString(", "))
231+
}
232+
233+
case _ =>
234+
withInfo(scanExec, "Comet Scan only supports Parquet and Iceberg Parquet file formats")
235+
}
236+
}
237+
249238
case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
250239
private def applyCometShuffle(plan: SparkPlan): SparkPlan = {
251240
plan.transformUp {
@@ -377,7 +366,7 @@ class CometSparkSessionExtensions
377366
plan.transformUp {
378367
// Fully native scan for V1
379368
case scan: CometScanExec
380-
if COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_DATAFUSION) =>
369+
if COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION =>
381370
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
382371
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
383372

@@ -513,14 +502,12 @@ class CometSparkSessionExtensions
513502

514503
case op: ShuffledHashJoinExec if !CometConf.COMET_EXEC_HASH_JOIN_ENABLED.get(conf) =>
515504
withInfo(op, "ShuffleHashJoin is not enabled")
516-
op
517505

518506
case op: ShuffledHashJoinExec if !op.children.forall(isCometNative) =>
519507
withInfo(
520508
op,
521509
"ShuffleHashJoin disabled because the following children are not native " +
522510
s"${explainChildNotNative(op)}")
523-
op
524511

525512
case op: BroadcastHashJoinExec
526513
if CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) &&
@@ -566,18 +553,15 @@ class CometSparkSessionExtensions
566553
op,
567554
"SortMergeJoin is not enabled because the following children are not native " +
568555
s"${explainChildNotNative(op)}")
569-
op
570556

571557
case op: SortMergeJoinExec if !CometConf.COMET_EXEC_SORT_MERGE_JOIN_ENABLED.get(conf) =>
572558
withInfo(op, "SortMergeJoin is not enabled")
573-
op
574559

575560
case op: SortMergeJoinExec if !op.children.forall(isCometNative(_)) =>
576561
withInfo(
577562
op,
578563
"SortMergeJoin is not enabled because the following children are not native " +
579564
s"${explainChildNotNative(op)}")
580-
op
581565

582566
case c @ CoalesceExec(numPartitions, child)
583567
if CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf)
@@ -592,14 +576,12 @@ class CometSparkSessionExtensions
592576

593577
case c @ CoalesceExec(_, _) if !CometConf.COMET_EXEC_COALESCE_ENABLED.get(conf) =>
594578
withInfo(c, "Coalesce is not enabled")
595-
c
596579

597580
case op: CoalesceExec if !op.children.forall(isCometNative(_)) =>
598581
withInfo(
599582
op,
600583
"Coalesce is not enabled because the following children are not native " +
601584
s"${explainChildNotNative(op)}")
602-
op
603585

604586
case s: TakeOrderedAndProjectExec
605587
if isCometNative(s.child) && CometConf.COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED
@@ -629,7 +611,6 @@ class CometSparkSessionExtensions
629611
!isCometShuffleEnabled(conf),
630612
"TakeOrderedAndProject requires shuffle to be enabled")
631613
withInfo(s, Seq(info1, info2).flatten.mkString(","))
632-
s
633614

634615
case w: WindowExec =>
635616
newPlanWithProto(
@@ -655,14 +636,12 @@ class CometSparkSessionExtensions
655636

656637
case u: UnionExec if !CometConf.COMET_EXEC_UNION_ENABLED.get(conf) =>
657638
withInfo(u, "Union is not enabled")
658-
u
659639

660640
case op: UnionExec if !op.children.forall(isCometNative(_)) =>
661641
withInfo(
662642
op,
663643
"Union is not enabled because the following children are not native " +
664644
s"${explainChildNotNative(op)}")
665-
op
666645

667646
// For AQE broadcast stage on a Comet broadcast exchange
668647
case s @ BroadcastQueryStageExec(_, _: CometBroadcastExchangeExec, _) =>
@@ -708,7 +687,6 @@ class CometSparkSessionExtensions
708687
plan,
709688
s"${plan.nodeName} is not native because the following children are not native " +
710689
s"${explainChildNotNative(plan)}")
711-
plan
712690
}
713691

714692
// this case should be checked only after the previous case checking for a
@@ -719,12 +697,10 @@ class CometSparkSessionExtensions
719697
op,
720698
"BroadcastHashJoin is not enabled because the following children are not native " +
721699
s"${explainChildNotNative(op)}")
722-
op
723700

724701
case op: BroadcastHashJoinExec
725702
if !CometConf.COMET_EXEC_BROADCAST_HASH_JOIN_ENABLED.get(conf) =>
726703
withInfo(op, "BroadcastHashJoin is not enabled")
727-
op
728704

729705
// For AQE shuffle stage on a Comet shuffle exchange
730706
case s @ ShuffleQueryStageExec(_, _: CometShuffleExchangeExec, _) =>
@@ -825,7 +801,6 @@ class CometSparkSessionExtensions
825801
"JVM shuffle: " +
826802
s"$typeInfo")
827803
withInfo(s, Seq(msg1, msg2, msg3).flatten.mkString(","))
828-
s
829804
}
830805

831806
case op =>
@@ -838,7 +813,6 @@ class CometSparkSessionExtensions
838813
case _ =>
839814
// An operator that is not supported by Comet
840815
withInfo(op, s"${op.nodeName} is not supported")
841-
op
842816
}
843817
}
844818
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class CometExecSuite extends CometTestBase {
115115
"select * from dpp_fact join dpp_dim on fact_date = dim_date where dim_id > 7")
116116
val (_, cometPlan) = checkSparkAnswer(df)
117117
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
118-
assert(infos.contains("DPP not supported"))
118+
assert(infos.contains("Dynamic Partition Pruning is not supported"))
119119
}
120120
}
121121
}

0 commit comments

Comments
 (0)