Skip to content

Commit 7d46ff2

Browse files
authored
chore: Add scanImpl attribute to CometScanExec (apache#1746)
1 parent 093a244 commit 7d46ff2

File tree

21 files changed

+125
-130
lines changed

21 files changed

+125
-130
lines changed

.github/workflows/miri.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,4 @@ jobs:
5252
- name: Test with Miri
5353
run: |
5454
cd native
55-
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test
55+
MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --lib --bins --tests --examples

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,6 @@ object CometConf extends ShimCometConf {
104104
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
105105
.toLowerCase(Locale.ROOT))
106106

107-
def isExperimentalNativeScan: Boolean = COMET_NATIVE_SCAN_IMPL.get() match {
108-
case SCAN_NATIVE_DATAFUSION | SCAN_NATIVE_ICEBERG_COMPAT => true
109-
case SCAN_NATIVE_COMET => false
110-
}
111-
112107
val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
113108
conf("spark.comet.parquet.read.parallel.io.enabled")
114109
.doc(

dev/diffs/3.4.3.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,7 @@ index 75eabcb96f2..36e3318ad7e 100644
961961
_.asInstanceOf[FileScanRDD].filePartitions.forall(
962962
_.files.forall(_.urlEncodedPath.contains("p=0"))))
963963
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
964-
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
964+
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
965965
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
966966
+ fs.inputRDDs().forall(
967967
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(

dev/diffs/3.5.4.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ index 260c992f1ae..b9d8e22337c 100644
10921092
_.asInstanceOf[FileScanRDD].filePartitions.forall(
10931093
_.files.forall(_.urlEncodedPath.contains("p=0"))))
10941094
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1095-
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
1095+
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
10961096
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
10971097
+ fs.inputRDDs().forall(
10981098
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(

dev/diffs/3.5.5.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ index 04702201f82..6cc2b01b7f3 100644
963963
_.asInstanceOf[FileScanRDD].filePartitions.forall(
964964
_.files.forall(_.urlEncodedPath.contains("p=0"))))
965965
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
966-
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
966+
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
967967
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
968968
+ fs.inputRDDs().forall(
969969
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(

dev/diffs/4.0.0-preview1.diff

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1035,7 +1035,7 @@ index 68f14f13bbd..174636cefb5 100644
10351035
_.asInstanceOf[FileScanRDD].filePartitions.forall(
10361036
_.files.forall(_.urlEncodedPath.contains("p=0"))))
10371037
+ case WholeStageCodegenExec(ColumnarToRowExec(InputAdapter(
1038-
+ fs @ CometScanExec(_, _, _, partitionFilters, _, _, _, _, _, _)))) =>
1038+
+ fs @ CometScanExec(_, _, _, _, partitionFilters, _, _, _, _, _, _)))) =>
10391039
+ partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
10401040
+ fs.inputRDDs().forall(
10411041
+ _.asInstanceOf[FileScanRDD].filePartitions.forall(

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ import org.apache.comet.vector.CometVector
5656
* in [[org.apache.comet.CometSparkSessionExtensions]]
5757
* - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values.
5858
*/
59-
class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with ShimSQLConf {
59+
class CometParquetFileFormat(scanImpl: String)
60+
extends ParquetFileFormat
61+
with MetricsSupport
62+
with ShimSQLConf {
6063
override def shortName(): String = "parquet"
6164
override def toString: String = "CometParquet"
6265
override def hashCode(): Int = getClass.hashCode()
@@ -100,8 +103,8 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
100103

101104
// Comet specific configurations
102105
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
103-
val nativeIcebergCompat =
104-
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
106+
107+
val nativeIcebergCompat = scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT
105108

106109
(file: PartitionedFile) => {
107110
val sharedConf = broadcastedHadoopConf.value.value

spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.comet.{CometConf, CometRuntimeException}
4646
import org.apache.comet.shims.ShimSQLConf
4747

4848
case class CometParquetPartitionReaderFactory(
49+
usingDataFusionReader: Boolean,
4950
@transient sqlConf: SQLConf,
5051
broadcastedConf: Broadcast[SerializableConfiguration],
5152
readDataSchema: StructType,
@@ -71,17 +72,6 @@ case class CometParquetPartitionReaderFactory(
7172
// Comet specific configurations
7273
private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf)
7374

74-
@transient private lazy val usingDataFusionReader: Boolean = {
75-
val conf = broadcastedConf.value.value
76-
conf.getBoolean(
77-
CometConf.COMET_NATIVE_SCAN_ENABLED.key,
78-
CometConf.COMET_NATIVE_SCAN_ENABLED.defaultValue.get) &&
79-
conf
80-
.get(
81-
CometConf.COMET_NATIVE_SCAN_IMPL.key,
82-
CometConf.COMET_NATIVE_SCAN_IMPL.defaultValueString)
83-
.equalsIgnoreCase(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
84-
}
8575
// This is only called at executor on a Broadcast variable, so we don't want it to be
8676
// materialized at driver.
8777
@transient private lazy val preFetchEnabled = {

spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ trait CometParquetScan extends FileScan with MetricsSupport {
5858
val broadcastedConf =
5959
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
6060
CometParquetPartitionReaderFactory(
61+
usingDataFusionReader = false, // this value is not used since this is v2 scan
6162
sqlConf,
6263
broadcastedConf,
6364
readDataSchema,

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec
3535
import org.apache.spark.sql.types.{DoubleType, FloatType}
3636

3737
import org.apache.comet.{CometConf, ExtendedExplainInfo}
38-
import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_NATIVE_SCAN_IMPL, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
38+
import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
3939
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
4040
import org.apache.comet.serde.OperatorOuterClass.Operator
4141
import org.apache.comet.serde.QueryPlanSerde
@@ -154,8 +154,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
154154

155155
plan.transformUp {
156156
// Fully native scan for V1
157-
case scan: CometScanExec
158-
if COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION =>
157+
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
159158
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
160159
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)
161160

0 commit comments

Comments
 (0)