Skip to content

Commit 24e4c12

Browse files
authored
feat: Improve fallback reporting for native_datafusion scan (#2879)
1 parent 2d3172d commit 24e4c12

File tree

2 files changed

+91
-65
lines changed

2 files changed

+91
-65
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 33 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ import org.apache.spark.sql.types._
4242

4343
import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
4444
import org.apache.comet.CometConf._
45-
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, withInfos}
45+
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometLoaded, withInfo, withInfos}
4646
import org.apache.comet.DataTypeSupport.isComplexType
4747
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
4848
import org.apache.comet.objectstore.NativeConfig
4949
import org.apache.comet.parquet.{CometParquetScan, Native, SupportsComet}
5050
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
51+
import org.apache.comet.serde.operator.CometNativeScan
5152
import org.apache.comet.shims.CometTypeShim
5253

5354
/**
@@ -132,9 +133,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
132133
}
133134
}
134135

135-
private def isDynamicPruningFilter(e: Expression): Boolean =
136-
e.exists(_.isInstanceOf[PlanExpression[_]])
137-
138136
private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
139137

140138
if (COMET_DPP_FALLBACK_ENABLED.get() &&
@@ -144,10 +142,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
144142

145143
scanExec.relation match {
146144
case r: HadoopFsRelation =>
147-
val fallbackReasons = new ListBuffer[String]()
148145
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
149-
fallbackReasons += s"Unsupported file format ${r.fileFormat}"
150-
return withInfos(scanExec, fallbackReasons.toSet)
146+
return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}")
151147
}
152148

153149
var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
@@ -160,42 +156,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
160156
scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf)
161157
}
162158

163-
// Native DataFusion doesn't support subqueries/dynamic pruning
164-
if (scanImpl == SCAN_NATIVE_DATAFUSION &&
165-
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
166-
fallbackReasons += "Native DataFusion scan does not support subqueries/dynamic pruning"
167-
return withInfos(scanExec, fallbackReasons.toSet)
168-
}
169-
170-
if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
171-
fallbackReasons +=
172-
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled"
173-
return withInfos(scanExec, fallbackReasons.toSet)
174-
}
175-
176-
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && (SQLConf.get.ignoreCorruptFiles ||
177-
scanExec.relation.options
178-
.get("ignorecorruptfiles") // Spark sets this to lowercase.
179-
.contains("true"))) {
180-
fallbackReasons +=
181-
"Full native scan disabled because ignoreCorruptFiles enabled"
182-
return withInfos(scanExec, fallbackReasons.toSet)
183-
}
184-
185-
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && (SQLConf.get.ignoreMissingFiles ||
186-
scanExec.relation.options
187-
.get("ignoremissingfiles") // Spark sets this to lowercase.
188-
.contains("true"))) {
189-
fallbackReasons +=
190-
"Full native scan disabled because ignoreMissingFiles enabled"
191-
return withInfos(scanExec, fallbackReasons.toSet)
192-
}
193-
194-
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && scanExec.bucketedScan) {
195-
// https://github.com/apache/datafusion-comet/issues/1719
196-
fallbackReasons +=
197-
"Full native scan disabled because bucketed scan is not supported"
198-
return withInfos(scanExec, fallbackReasons.toSet)
159+
if (scanImpl == SCAN_NATIVE_DATAFUSION && !CometNativeScan.isSupported(scanExec)) {
160+
return scanExec
199161
}
200162

201163
val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema)
@@ -206,36 +168,27 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
206168
// Spark already converted these to Java-native types, so we can't check SQL types.
207169
// ArrayBasedMapData, GenericInternalRow, GenericArrayData correspond to maps, structs,
208170
// and arrays respectively.
209-
fallbackReasons +=
210-
"Full native scan disabled because nested types for default values are not supported"
211-
return withInfos(scanExec, fallbackReasons.toSet)
171+
withInfo(
172+
scanExec,
173+
"Full native scan disabled because nested types for default values are not supported")
212174
}
213175

214176
if (encryptionEnabled(hadoopConf) && scanImpl != CometConf.SCAN_NATIVE_COMET) {
215177
if (!isEncryptionConfigSupported(hadoopConf)) {
216-
return withInfos(scanExec, fallbackReasons.toSet)
178+
withInfo(scanExec, s"$scanImpl does not support encryption")
217179
}
218180
}
219181

220-
val typeChecker = CometScanTypeChecker(scanImpl)
221-
val schemaSupported =
222-
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
223-
val partitionSchemaSupported =
224-
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
225-
226-
if (!schemaSupported) {
227-
fallbackReasons += s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl"
228-
}
229-
if (!partitionSchemaSupported) {
230-
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
231-
}
182+
// check that schema is supported
183+
checkSchema(scanExec, scanImpl, r)
232184

233-
if (schemaSupported && partitionSchemaSupported) {
185+
if (hasExplainInfo(scanExec)) {
186+
// could not accelerate, and plan is already tagged with fallback reasons
187+
scanExec
188+
} else {
234189
// this is confusing, but we always insert a CometScanExec here, which may replaced
235190
// with a CometNativeExec when CometExecRule runs, depending on the scanImpl value.
236191
CometScanExec(scanExec, session, scanImpl)
237-
} else {
238-
withInfos(scanExec, fallbackReasons.toSet)
239192
}
240193

241194
case _ =>
@@ -647,6 +600,24 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
647600
}
648601
}
649602

603+
private def isDynamicPruningFilter(e: Expression): Boolean =
604+
e.exists(_.isInstanceOf[PlanExpression[_]])
605+
606+
def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = {
607+
val fallbackReasons = new ListBuffer[String]()
608+
val typeChecker = CometScanTypeChecker(scanImpl)
609+
val schemaSupported =
610+
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
611+
if (!schemaSupported) {
612+
withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl")
613+
}
614+
val partitionSchemaSupported =
615+
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
616+
if (!partitionSchemaSupported) {
617+
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
618+
}
619+
withInfos(scanExec, fallbackReasons.toSet)
620+
}
650621
}
651622

652623
case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with CometTypeShim {

spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,82 @@ import scala.collection.mutable.ListBuffer
2323
import scala.jdk.CollectionConverters._
2424

2525
import org.apache.spark.internal.Logging
26-
import org.apache.spark.sql.catalyst.expressions.Literal
26+
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, PlanExpression}
2727
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
2828
import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec}
29+
import org.apache.spark.sql.execution.FileSourceScanExec
2930
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
3031
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition}
3132
import org.apache.spark.sql.internal.SQLConf
3233
import org.apache.spark.sql.types.{StructField, StructType}
3334

3435
import org.apache.comet.{CometConf, ConfigEntry}
35-
import org.apache.comet.CometSparkSessionExtensions.withInfo
36+
import org.apache.comet.CometConf.COMET_EXEC_ENABLED
37+
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, withInfo}
3638
import org.apache.comet.objectstore.NativeConfig
3739
import org.apache.comet.parquet.CometParquetUtils
38-
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
40+
import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel}
3941
import org.apache.comet.serde.ExprOuterClass.Expr
4042
import org.apache.comet.serde.OperatorOuterClass.Operator
4143
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
4244

45+
/**
46+
* Validation and serde logic for `native_datafusion` scans.
47+
*/
4348
object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
4449

50+
/** Determine whether the scan is supported and tag the Spark plan with any fallback reasons */
51+
def isSupported(scanExec: FileSourceScanExec): Boolean = {
52+
53+
if (hasExplainInfo(scanExec)) {
54+
// this node has already been tagged with fallback reasons
55+
return false
56+
}
57+
58+
if (!COMET_EXEC_ENABLED.get()) {
59+
withInfo(scanExec, s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled")
60+
}
61+
62+
// Native DataFusion doesn't support subqueries/dynamic pruning
63+
if (scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
64+
withInfo(scanExec, "Native DataFusion scan does not support subqueries/dynamic pruning")
65+
}
66+
67+
if (SQLConf.get.ignoreCorruptFiles ||
68+
scanExec.relation.options
69+
.get("ignorecorruptfiles") // Spark sets this to lowercase.
70+
.contains("true")) {
71+
withInfo(scanExec, "Full native scan disabled because ignoreCorruptFiles enabled")
72+
}
73+
74+
if (SQLConf.get.ignoreMissingFiles ||
75+
scanExec.relation.options
76+
.get("ignoremissingfiles") // Spark sets this to lowercase.
77+
.contains("true")) {
78+
79+
withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled")
80+
}
81+
82+
if (scanExec.bucketedScan) {
83+
// https://github.com/apache/datafusion-comet/issues/1719
84+
withInfo(scanExec, "Full native scan disabled because bucketed scan is not supported")
85+
}
86+
87+
// the scan is supported if no fallback reasons were added to the node
88+
!hasExplainInfo(scanExec)
89+
}
90+
91+
private def isDynamicPruningFilter(e: Expression): Boolean =
92+
e.exists(_.isInstanceOf[PlanExpression[_]])
93+
4594
override def enabledConfig: Option[ConfigEntry[Boolean]] = None
4695

96+
override def getSupportLevel(operator: CometScanExec): SupportLevel = {
97+
// all checks happen in CometScanRule before ScanExec is converted to CometScanExec, so
98+
// we always report compatible here because this serde object is for the converted CometScanExec
99+
Compatible()
100+
}
101+
47102
override def convert(
48103
scan: CometScanExec,
49104
builder: Operator.Builder,

0 commit comments

Comments
 (0)