Skip to content

Commit 2226cfa

Browse files
authored
fix: Refactor CometScanRule and fix bugs (#1483)
1 parent 56a5213 commit 2226cfa

File tree

3 files changed

+85
-68
lines changed

3 files changed

+85
-68
lines changed

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 58 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ package org.apache.comet
2121

2222
import java.nio.ByteOrder
2323

24+
import scala.collection.mutable.ListBuffer
25+
2426
import org.apache.spark.SparkConf
2527
import org.apache.spark.internal.Logging
2628
import org.apache.spark.network.util.ByteUnit
@@ -100,9 +102,6 @@ class CometSparkSessionExtensions
100102
plan
101103
} else {
102104

103-
def isDynamicPruningFilter(e: Expression): Boolean =
104-
e.exists(_.isInstanceOf[PlanExpression[_]])
105-
106105
def hasMetadataCol(plan: SparkPlan): Boolean = {
107106
plan.expressions.exists(_.exists {
108107
case a: Attribute =>
@@ -116,11 +115,9 @@ class CometSparkSessionExtensions
116115
withInfo(scan, "Metadata column is not supported")
117116
scan
118117

119-
case scanExec: FileSourceScanExec
120-
if COMET_DPP_FALLBACK_ENABLED.get() &&
121-
scanExec.partitionFilters.exists(isDynamicPruningFilter) =>
122-
withInfo(scanExec, "DPP not supported")
123-
scanExec
118+
// data source V1
119+
case scanExec: FileSourceScanExec =>
120+
transformV1Scan(scanExec)
124121

125122
// data source V2
126123
case scanExec: BatchScanExec
@@ -188,69 +185,62 @@ class CometSparkSessionExtensions
188185
scanExec
189186
}
190187

191-
// data source V1
192-
case scanExec @ FileSourceScanExec(
193-
HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
194-
_: Seq[_],
195-
requiredSchema,
196-
_,
197-
_,
198-
_,
199-
_,
200-
_,
201-
_)
202-
if CometScanExec.isFileFormatSupported(fileFormat)
203-
&& CometNativeScanExec.isSchemaSupported(requiredSchema)
204-
&& CometNativeScanExec.isSchemaSupported(partitionSchema)
205-
// TODO we only enable full native scan if COMET_EXEC_ENABLED is enabled
206-
// but this is not really what we want .. we currently insert `CometScanExec`
207-
// here and then it gets replaced with `CometNativeScanExec` in `CometExecRule`
208-
// but that only happens if `COMET_EXEC_ENABLED` is enabled
209-
&& COMET_EXEC_ENABLED.get()
210-
&& COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION =>
211-
logInfo("Comet extension enabled for v1 full native Scan")
212-
CometScanExec(scanExec, session)
188+
}
189+
}
190+
}
213191

214-
// data source V1
215-
case scanExec @ FileSourceScanExec(
216-
HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
217-
_: Seq[_],
218-
requiredSchema,
219-
_,
220-
_,
221-
_,
222-
_,
223-
_,
224-
_)
225-
if CometScanExec.isFileFormatSupported(fileFormat)
226-
&& CometScanExec.isSchemaSupported(requiredSchema)
227-
&& CometScanExec.isSchemaSupported(partitionSchema) =>
228-
logInfo("Comet extension enabled for v1 Scan")
229-
CometScanExec(scanExec, session)
192+
private def isDynamicPruningFilter(e: Expression): Boolean =
193+
e.exists(_.isInstanceOf[PlanExpression[_]])
230194

231-
// data source v1 not supported case
232-
case scanExec @ FileSourceScanExec(
233-
HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
234-
_: Seq[_],
235-
requiredSchema,
236-
_,
237-
_,
238-
_,
239-
_,
240-
_,
241-
_) =>
242-
val info1 = createMessage(
243-
!CometScanExec.isFileFormatSupported(fileFormat),
244-
s"File format $fileFormat is not supported")
245-
val info2 = createMessage(
246-
!CometScanExec.isSchemaSupported(requiredSchema),
247-
s"Schema $requiredSchema is not supported")
248-
val info3 = createMessage(
249-
!CometScanExec.isSchemaSupported(partitionSchema),
250-
s"Partition schema $partitionSchema is not supported")
251-
withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString(","))
195+
private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
196+
197+
if (COMET_DPP_FALLBACK_ENABLED.get() &&
198+
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
199+
withInfo(scanExec, "DPP not supported")
200+
return scanExec
201+
}
202+
203+
scanExec.relation match {
204+
case r: HadoopFsRelation =>
205+
val fallbackReasons = new ListBuffer[String]()
206+
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
207+
fallbackReasons += s"Unsupported file format ${r.fileFormat}"
208+
}
209+
210+
val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
211+
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
212+
fallbackReasons +=
213+
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled"
214+
}
215+
216+
val (schemaSupported, partitionSchemaSupported) = scanImpl match {
217+
case CometConf.SCAN_NATIVE_DATAFUSION =>
218+
(
219+
CometNativeScanExec.isSchemaSupported(scanExec.requiredSchema),
220+
CometNativeScanExec.isSchemaSupported(r.partitionSchema))
221+
case CometConf.SCAN_NATIVE_COMET | SCAN_NATIVE_ICEBERG_COMPAT =>
222+
(
223+
CometScanExec.isSchemaSupported(scanExec.requiredSchema),
224+
CometScanExec.isSchemaSupported(r.partitionSchema))
225+
}
226+
227+
if (!schemaSupported) {
228+
fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl"
229+
}
230+
if (!partitionSchemaSupported) {
231+
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
232+
}
233+
234+
if (fallbackReasons.isEmpty) {
235+
CometScanExec(scanExec, session)
236+
} else {
237+
withInfo(scanExec, fallbackReasons.mkString(", "))
252238
scanExec
253-
}
239+
}
240+
241+
case _ =>
242+
withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
243+
scanExec
254244
}
255245
}
256246
}

spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
110110
}
111111

112112
test("columnar shuffle on nested struct including nulls") {
113+
// https://github.com/apache/datafusion-comet/issues/1538
114+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
113115
Seq(10, 201).foreach { numPartitions =>
114116
Seq("1.0", "10.0").foreach { ratio =>
115117
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
@@ -247,6 +249,9 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
247249
}
248250

249251
test("columnar shuffle on map") {
252+
// https://github.com/apache/datafusion-comet/issues/1538
253+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
254+
250255
def genTuples[K](num: Int, keys: Seq[K]): Seq[(
251256
Int,
252257
Map[K, Boolean],
@@ -580,6 +585,9 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
580585
}
581586

582587
test("columnar shuffle on array") {
588+
// https://github.com/apache/datafusion-comet/issues/1538
589+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
590+
583591
Seq(10, 201).foreach { numPartitions =>
584592
Seq("1.0", "10.0").foreach { ratio =>
585593
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
@@ -678,6 +686,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
678686
}
679687

680688
test("fix: Dictionary field should have distinct dict_id") {
689+
// https://github.com/apache/datafusion-comet/issues/1538
690+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
681691
Seq(10, 201).foreach { numPartitions =>
682692
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0") {
683693
withParquetTable(
@@ -695,6 +705,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
695705
}
696706

697707
test("dictionary shuffle") {
708+
// https://github.com/apache/datafusion-comet/issues/1538
709+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
698710
Seq(10, 201).foreach { numPartitions =>
699711
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0") {
700712
withParquetTable((0 until 10000).map(i => (1.toString, (i + 1).toLong)), "tbl") {
@@ -710,6 +722,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
710722
}
711723

712724
test("dictionary shuffle: fallback to string") {
725+
// https://github.com/apache/datafusion-comet/issues/1538
726+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
713727
Seq(10, 201).foreach { numPartitions =>
714728
withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0") {
715729
withParquetTable((0 until 10000).map(i => (1.toString, (i + 1).toLong)), "tbl") {
@@ -725,6 +739,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
725739
}
726740

727741
test("fix: inMemSorter should be reset after spilling") {
742+
// https://github.com/apache/datafusion-comet/issues/1538
743+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
728744
withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") {
729745
assert(
730746
sql("SELECT * FROM tbl").repartition(201, $"_1").count() == sql("SELECT * FROM tbl")
@@ -733,6 +749,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
733749
}
734750

735751
test("fix: native Unsafe row accessors return incorrect results") {
752+
// https://github.com/apache/datafusion-comet/issues/1538
753+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
736754
Seq(10, 201).foreach { numPartitions =>
737755
withTempDir { dir =>
738756
val path = new Path(dir.toURI.toString, "test.parquet")
@@ -854,6 +872,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
854872
}
855873

856874
test("Columnar shuffle for large shuffle partition number") {
875+
// https://github.com/apache/datafusion-comet/issues/1538
876+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
857877
Seq(10, 200, 201).foreach { numPartitions =>
858878
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
859879
val df = sql("SELECT * FROM tbl")
@@ -872,6 +892,8 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
872892
}
873893

874894
test("hash-based columnar shuffle") {
895+
// https://github.com/apache/datafusion-comet/issues/1538
896+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
875897
Seq(10, 200, 201).foreach { numPartitions =>
876898
withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
877899
val df = sql("SELECT * FROM tbl")
@@ -900,6 +922,9 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
900922
}
901923

902924
test("columnar shuffle: different data type") {
925+
// https://github.com/apache/datafusion-comet/issues/1538
926+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
927+
903928
Seq(true, false).foreach { dictionaryEnabled =>
904929
withTempDir { dir =>
905930
val path = new Path(dir.toURI.toString, "test.parquet")

spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ class CometNativeShuffleSuite extends CometTestBase with AdaptiveSparkPlanHelper
5757
}
5858

5959
test("native shuffle: different data type") {
60+
// https://github.com/apache/datafusion-comet/issues/1538
61+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
6062
Seq(true, false).foreach { execEnabled =>
6163
Seq(true, false).foreach { dictionaryEnabled =>
6264
withTempDir { dir =>

0 commit comments

Comments
 (0)