Skip to content

Commit 23f9194

Browse files
authored
chore: Refactor CometScanRule to improve scan selection and fallback logic (#2978)
1 parent af3bd81 commit 23f9194

File tree

1 file changed

+77
-36
lines changed

1 file changed

+77
-36
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 77 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.sql.types._
4242

4343
import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
4444
import org.apache.comet.CometConf._
45-
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometLoaded, withInfo, withInfos}
45+
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, withInfos}
4646
import org.apache.comet.DataTypeSupport.isComplexType
4747
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
4848
import org.apache.comet.objectstore.NativeConfig
@@ -145,21 +145,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
145145
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
146146
return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}")
147147
}
148+
val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options)
148149

149-
var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
150-
151-
val hadoopConf = scanExec.relation.sparkSession.sessionState
152-
.newHadoopConfWithOptions(scanExec.relation.options)
153-
154-
// if scan is auto then pick the best available scan
155-
if (scanImpl == SCAN_AUTO) {
156-
scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf)
157-
}
158-
159-
if (scanImpl == SCAN_NATIVE_DATAFUSION && !CometNativeScan.isSupported(scanExec)) {
160-
return scanExec
161-
}
162-
150+
// TODO is this restriction valid for all native scan types?
163151
val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema)
164152
if (possibleDefaultValues.exists(d => {
165153
d != null && (d.isInstanceOf[ArrayBasedMapData] || d
@@ -170,32 +158,73 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
170158
// and arrays respectively.
171159
withInfo(
172160
scanExec,
173-
"Full native scan disabled because nested types for default values are not supported")
174-
}
175-
176-
if (encryptionEnabled(hadoopConf) && scanImpl != CometConf.SCAN_NATIVE_COMET) {
177-
if (!isEncryptionConfigSupported(hadoopConf)) {
178-
withInfo(scanExec, s"$scanImpl does not support encryption")
179-
}
161+
"Full native scan disabled because default values for nested types are not supported")
162+
return scanExec
180163
}
181164

182-
// check that schema is supported
183-
checkSchema(scanExec, scanImpl, r)
184-
185-
if (hasExplainInfo(scanExec)) {
186-
// could not accelerate, and plan is already tagged with fallback reasons
187-
scanExec
188-
} else {
189-
// this is confusing, but we always insert a CometScanExec here, which may replaced
190-
// with a CometNativeExec when CometExecRule runs, depending on the scanImpl value.
191-
CometScanExec(scanExec, session, scanImpl)
165+
COMET_NATIVE_SCAN_IMPL.get() match {
166+
case SCAN_AUTO =>
167+
// TODO add support for native_datafusion in the future
168+
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
169+
.orElse(nativeCometScan(session, scanExec, r, hadoopConf))
170+
.getOrElse(scanExec)
171+
case SCAN_NATIVE_DATAFUSION =>
172+
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
173+
case SCAN_NATIVE_ICEBERG_COMPAT =>
174+
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
175+
case SCAN_NATIVE_COMET =>
176+
nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
192177
}
193178

194179
case _ =>
195180
withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
196181
}
197182
}
198183

184+
private def nativeDataFusionScan(
185+
session: SparkSession,
186+
scanExec: FileSourceScanExec,
187+
r: HadoopFsRelation,
188+
hadoopConf: Configuration): Option[SparkPlan] = {
189+
if (!CometNativeScan.isSupported(scanExec)) {
190+
return None
191+
}
192+
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
193+
withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support encryption")
194+
return None
195+
}
196+
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
197+
return None
198+
}
199+
Some(CometScanExec(scanExec, session, SCAN_NATIVE_DATAFUSION))
200+
}
201+
202+
private def nativeIcebergCompatScan(
203+
session: SparkSession,
204+
scanExec: FileSourceScanExec,
205+
r: HadoopFsRelation,
206+
hadoopConf: Configuration): Option[SparkPlan] = {
207+
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
208+
withInfo(scanExec, s"$SCAN_NATIVE_ICEBERG_COMPAT does not support encryption")
209+
return None
210+
}
211+
if (!isSchemaSupported(scanExec, SCAN_NATIVE_ICEBERG_COMPAT, r)) {
212+
return None
213+
}
214+
Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT))
215+
}
216+
217+
private def nativeCometScan(
218+
session: SparkSession,
219+
scanExec: FileSourceScanExec,
220+
r: HadoopFsRelation,
221+
hadoopConf: Configuration): Option[SparkPlan] = {
222+
if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) {
223+
return None
224+
}
225+
Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET))
226+
}
227+
199228
private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {
200229

201230
scanExec.scan match {
@@ -612,20 +641,32 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
612641
private def isDynamicPruningFilter(e: Expression): Boolean =
613642
e.exists(_.isInstanceOf[PlanExpression[_]])
614643

615-
def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = {
644+
private def isSchemaSupported(
645+
scanExec: FileSourceScanExec,
646+
scanImpl: String,
647+
r: HadoopFsRelation): Boolean = {
616648
val fallbackReasons = new ListBuffer[String]()
617649
val typeChecker = CometScanTypeChecker(scanImpl)
618650
val schemaSupported =
619651
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
620652
if (!schemaSupported) {
621-
withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl")
653+
withInfo(
654+
scanExec,
655+
s"Unsupported schema ${scanExec.requiredSchema} " +
656+
s"for $scanImpl: ${fallbackReasons.mkString(", ")}")
657+
return false
622658
}
623659
val partitionSchemaSupported =
624660
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
625661
if (!partitionSchemaSupported) {
626-
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
662+
withInfo(
663+
scanExec,
664+
s"Unsupported partitioning schema ${scanExec.requiredSchema} " +
665+
s"for $scanImpl: ${fallbackReasons
666+
.mkString(", ")}")
667+
return false
627668
}
628-
withInfos(scanExec, fallbackReasons.toSet)
669+
true
629670
}
630671
}
631672

0 commit comments

Comments
 (0)