@@ -33,8 +33,8 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3333import org .apache .spark .sql .internal .SQLConf
3434
3535import org .apache .comet .CometConf
36- import org .apache .comet .CometConf .{ COMET_DPP_FALLBACK_ENABLED , COMET_EXEC_ENABLED , COMET_NATIVE_SCAN_IMPL , COMET_SCHEMA_EVOLUTION_ENABLED , SCAN_NATIVE_ICEBERG_COMPAT }
37- import org .apache .comet .CometSparkSessionExtensions .{isCometLoaded , isCometScanEnabled , withInfo }
36+ import org .apache .comet .CometConf ._
37+ import org .apache .comet .CometSparkSessionExtensions .{isCometLoaded , isCometScanEnabled , withInfo , withInfos }
3838import org .apache .comet .parquet .{CometParquetScan , SupportsComet }
3939
4040case class CometScanRule (session : SparkSession ) extends Rule [SparkPlan ] {
@@ -86,14 +86,21 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
8686 val fallbackReasons = new ListBuffer [String ]()
8787 if (! CometScanExec .isFileFormatSupported(r.fileFormat)) {
8888 fallbackReasons += s " Unsupported file format ${r.fileFormat}"
89- return withInfo (scanExec, fallbackReasons.mkString( " , " ) )
89+ return withInfos (scanExec, fallbackReasons.toSet )
9090 }
9191
9292 val scanImpl = COMET_NATIVE_SCAN_IMPL .get()
9393 if (scanImpl == CometConf .SCAN_NATIVE_DATAFUSION && ! COMET_EXEC_ENABLED .get()) {
9494 fallbackReasons +=
9595 s " Full native scan disabled because ${COMET_EXEC_ENABLED .key} disabled "
96- return withInfo(scanExec, fallbackReasons.mkString(" , " ))
96+ return withInfos(scanExec, fallbackReasons.toSet)
97+ }
98+
99+ if (scanImpl == CometConf .SCAN_NATIVE_DATAFUSION && scanExec.bucketedScan) {
100+ // https://github.com/apache/datafusion-comet/issues/1719
101+ fallbackReasons +=
102+ " Full native scan disabled because bucketed scan is not supported"
103+ return withInfos(scanExec, fallbackReasons.toSet)
97104 }
98105
99106 val (schemaSupported, partitionSchemaSupported) = scanImpl match {
@@ -117,7 +124,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
117124 if (schemaSupported && partitionSchemaSupported) {
118125 CometScanExec (scanExec, session)
119126 } else {
120- withInfo (scanExec, fallbackReasons.mkString( " , " ) )
127+ withInfos (scanExec, fallbackReasons.toSet )
121128 }
122129
123130 case _ =>
@@ -152,7 +159,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
152159 scanExec.copy(scan = cometScan),
153160 runtimeFilters = scanExec.runtimeFilters)
154161 } else {
155- withInfo (scanExec, fallbackReasons.mkString( " , " ) )
162+ withInfos (scanExec, fallbackReasons.toSet )
156163 }
157164
158165 // Iceberg scan
@@ -179,7 +186,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
179186 scanExec.clone().asInstanceOf [BatchScanExec ],
180187 runtimeFilters = scanExec.runtimeFilters)
181188 } else {
182- withInfo (scanExec, fallbackReasons.mkString( " , " ) )
189+ withInfos (scanExec, fallbackReasons.toSet )
183190 }
184191
185192 case other =>
0 commit comments