Skip to content

Commit 773eded

Browse files
committed
Fix existing Iceberg integration.
1 parent 46c507e commit 773eded

File tree

1 file changed

+28
-1
lines changed

1 file changed

+28
-1
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanE
4646
import org.apache.comet.DataTypeSupport.isComplexType
4747
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
4848
import org.apache.comet.objectstore.NativeConfig
49-
import org.apache.comet.parquet.{CometParquetScan, Native}
49+
import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
5050
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
5151
import org.apache.comet.shims.CometTypeShim
5252

@@ -266,6 +266,33 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
266266
withInfos(scanExec, fallbackReasons.toSet)
267267
}
268268

269+
// Iceberg scan - patched version implementing SupportsComet interface
270+
case s: SupportsComet =>
271+
val fallbackReasons = new ListBuffer[String]()
272+
273+
if (!s.isCometEnabled) {
274+
fallbackReasons += "Comet extension is not enabled for " +
275+
s"${scanExec.scan.getClass.getSimpleName}: not enabled on data source side"
276+
}
277+
278+
val schemaSupported =
279+
CometBatchScanExec.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons)
280+
281+
if (!schemaSupported) {
282+
fallbackReasons += "Comet extension is not enabled for " +
283+
s"${scanExec.scan.getClass.getSimpleName}: Schema not supported"
284+
}
285+
286+
if (s.isCometEnabled && schemaSupported) {
287+
// When reading from Iceberg, we automatically enable type promotion
288+
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
289+
CometBatchScanExec(
290+
scanExec.clone().asInstanceOf[BatchScanExec],
291+
runtimeFilters = scanExec.runtimeFilters)
292+
} else {
293+
withInfos(scanExec, fallbackReasons.toSet)
294+
}
295+
269296
// Iceberg scan - detected by class name (works with unpatched Iceberg)
270297
case _
271298
if scanExec.scan.getClass.getName ==

0 commit comments

Comments
 (0)