Skip to content

Commit 9b250c4

Browse files
authored
fix: Don't create CometScanExec for subclasses of ParquetFileFormat (apache#1129)
* Use exact class comparison for parquet scan * Add test * Add comment
1 parent ebdde77 commit 9b250c4

File tree

3 files changed

+25
-6
lines changed

3 files changed

+25
-6
lines changed

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class CometSparkSessionExtensions
190190

191191
// data source V1
192192
case scanExec @ FileSourceScanExec(
193-
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
193+
HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
194194
_: Seq[_],
195195
requiredSchema,
196196
_,
@@ -199,14 +199,15 @@ class CometSparkSessionExtensions
199199
_,
200200
_,
201201
_)
202-
if CometScanExec.isSchemaSupported(requiredSchema)
202+
if CometScanExec.isFileFormatSupported(fileFormat)
203+
&& CometScanExec.isSchemaSupported(requiredSchema)
203204
&& CometScanExec.isSchemaSupported(partitionSchema) =>
204205
logInfo("Comet extension enabled for v1 Scan")
205206
CometScanExec(scanExec, session)
206207

207208
// data source v1 not supported case
208209
case scanExec @ FileSourceScanExec(
209-
HadoopFsRelation(_, partitionSchema, _, _, _: ParquetFileFormat, _),
210+
HadoopFsRelation(_, partitionSchema, _, _, fileFormat, _),
210211
_: Seq[_],
211212
requiredSchema,
212213
_,
@@ -216,12 +217,15 @@ class CometSparkSessionExtensions
216217
_,
217218
_) =>
218219
val info1 = createMessage(
220+
!CometScanExec.isFileFormatSupported(fileFormat),
221+
s"File format $fileFormat is not supported")
222+
val info2 = createMessage(
219223
!CometScanExec.isSchemaSupported(requiredSchema),
220224
s"Schema $requiredSchema is not supported")
221-
val info2 = createMessage(
225+
val info3 = createMessage(
222226
!CometScanExec.isSchemaSupported(partitionSchema),
223227
s"Partition schema $partitionSchema is not supported")
224-
withInfo(scanExec, Seq(info1, info2).flatten.mkString(","))
228+
withInfo(scanExec, Seq(info1, info2, info3).flatten.mkString(","))
225229
scanExec
226230
}
227231
}

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3535
import org.apache.spark.sql.comet.shims.ShimCometScanExec
3636
import org.apache.spark.sql.execution._
3737
import org.apache.spark.sql.execution.datasources._
38-
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
38+
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
3939
import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
4040
import org.apache.spark.sql.execution.metric._
4141
import org.apache.spark.sql.types._
@@ -503,4 +503,10 @@ object CometScanExec extends DataTypeSupport {
503503
scanExec.logicalLink.foreach(batchScanExec.setLogicalLink)
504504
batchScanExec
505505
}
506+
507+
def isFileFormatSupported(fileFormat: FileFormat): Boolean = {
508+
// Only support Spark's built-in Parquet scans, not others such as Delta which use a subclass
509+
// of ParquetFileFormat.
510+
fileFormat.getClass().equals(classOf[ParquetFileFormat])
511+
}
506512
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHas
3939
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
4040
import org.apache.spark.sql.execution.{CollectLimitExec, ProjectExec, SQLExecution, UnionExec}
4141
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
42+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
4243
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
4344
import org.apache.spark.sql.execution.joins.{BroadcastNestedLoopJoinExec, CartesianProductExec, SortMergeJoinExec}
4445
import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery
@@ -1883,6 +1884,14 @@ class CometExecSuite extends CometTestBase {
18831884
}
18841885
}
18851886
}
1887+
1888+
test("Supported file formats for CometScanExec") {
1889+
assert(CometScanExec.isFileFormatSupported(new ParquetFileFormat()))
1890+
1891+
class CustomParquetFileFormat extends ParquetFileFormat {}
1892+
1893+
assert(!CometScanExec.isFileFormatSupported(new CustomParquetFileFormat()))
1894+
}
18861895
}
18871896

18881897
case class BucketedTableTestSpec(

0 commit comments

Comments
 (0)