Skip to content

Commit 06ed88b

Browse files
authored
feat: Change default value of COMET_NATIVE_SCAN_IMPL to auto (#1933)
1 parent ded4022 commit 06ed88b

File tree

5 files changed

+56
-88
lines changed

5 files changed

+56
-88
lines changed

.github/workflows/spark_sql_test.yml

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ env:
4040
RUST_VERSION: stable
4141

4242
jobs:
43-
spark-sql-native-comet:
43+
spark-sql-auto-scan:
4444
strategy:
4545
matrix:
4646
os: [ubuntu-24.04]
@@ -75,7 +75,46 @@ jobs:
7575
run: |
7676
cd apache-spark
7777
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
78-
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
78+
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=true build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
79+
env:
80+
LC_ALL: "C.UTF-8"
81+
82+
spark-sql-native-native-comet:
83+
strategy:
84+
matrix:
85+
os: [ ubuntu-24.04 ]
86+
java-version: [ 11 ]
87+
spark-version: [ { short: '3.4', full: '3.4.3' }, { short: '3.5', full: '3.5.6' } ]
88+
module:
89+
- { name: "catalyst", args1: "catalyst/test", args2: "" }
90+
- { name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest }
91+
- { name: "sql/core-2", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest" }
92+
- { name: "sql/core-3", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest" }
93+
- { name: "sql/hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest" }
94+
- { name: "sql/hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest" }
95+
- { name: "sql/hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest" }
96+
fail-fast: false
97+
name: spark-sql-native-comet-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.java-version }}
98+
runs-on: ${{ matrix.os }}
99+
container:
100+
image: amd64/rust
101+
steps:
102+
- uses: actions/checkout@v4
103+
- name: Setup Rust & Java toolchain
104+
uses: ./.github/actions/setup-builder
105+
with:
106+
rust-version: ${{env.RUST_VERSION}}
107+
jdk-version: ${{ matrix.java-version }}
108+
- name: Setup Spark
109+
uses: ./.github/actions/setup-spark-builder
110+
with:
111+
spark-version: ${{ matrix.spark-version.full }}
112+
spark-short-version: ${{ matrix.spark-version.short }}
113+
- name: Run Spark tests
114+
run: |
115+
cd apache-spark
116+
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
117+
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=true COMET_PARQUET_SCAN_IMPL=native_comet build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
79118
env:
80119
LC_ALL: "C.UTF-8"
81120

.github/workflows/spark_sql_test_native_auto.yml

Lines changed: 0 additions & 81 deletions
This file was deleted.

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ object CometConf extends ShimCometConf {
103103
.checkValues(
104104
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
105105
.createWithDefault(sys.env
106-
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
106+
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
107107
.toLowerCase(Locale.ROOT))
108108

109109
val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] =

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3434
import org.apache.spark.sql.internal.SQLConf
3535
import org.apache.spark.sql.types._
3636

37-
import org.apache.comet.{CometConf, DataTypeSupport}
37+
import org.apache.comet.{CometConf, CometSparkSessionExtensions, DataTypeSupport}
3838
import org.apache.comet.CometConf._
3939
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos}
4040
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
@@ -261,6 +261,10 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
261261

262262
val fallbackReasons = new ListBuffer[String]()
263263

264+
if (CometSparkSessionExtensions.isSpark40Plus) {
265+
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT is not implemented for Spark 4.0.0"
266+
}
267+
264268
// native_iceberg_compat only supports local filesystem and S3
265269
if (!scanExec.relation.inputFiles
266270
.forall(path => path.startsWith("file://") || path.startsWith("s3a://"))) {

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2211,6 +2211,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
22112211
}
22122212

22132213
test("get_struct_field - select primitive fields") {
2214+
val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get()
2215+
assume(!(scanImpl == CometConf.SCAN_AUTO && CometSparkSessionExtensions.isSpark40Plus))
22142216
withTempPath { dir =>
22152217
// create input file with Comet disabled
22162218
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
@@ -2225,7 +2227,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
22252227
val df = spark.read.parquet(dir.toString()).select("nested1.id")
22262228
// Comet's original scan does not support structs.
22272229
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
2228-
if (!CometConf.COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_COMET)) {
2230+
if (!scanImpl.equals(CometConf.SCAN_NATIVE_COMET)) {
22292231
checkSparkAnswerAndOperator(df)
22302232
} else {
22312233
checkSparkAnswer(df)
@@ -2234,6 +2236,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
22342236
}
22352237

22362238
test("get_struct_field - select subset of struct") {
2239+
val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get()
2240+
assume(!(scanImpl == CometConf.SCAN_AUTO && CometSparkSessionExtensions.isSpark40Plus))
22372241
withTempPath { dir =>
22382242
// create input file with Comet disabled
22392243
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
@@ -2255,7 +2259,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
22552259
val df = spark.read.parquet(dir.toString())
22562260
// Comet's original scan does not support structs.
22572261
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
2258-
if (!CometConf.COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_COMET)) {
2262+
if (scanImpl != CometConf.SCAN_NATIVE_COMET) {
22592263
checkSparkAnswerAndOperator(df.select("nested1.id"))
22602264
checkSparkAnswerAndOperator(df.select("nested1.nested2"))
22612265
checkSparkAnswerAndOperator(df.select("nested1.nested2.id"))
@@ -2270,6 +2274,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
22702274
}
22712275

22722276
test("get_struct_field - read entire struct") {
2277+
val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get()
2278+
assume(!(scanImpl == CometConf.SCAN_AUTO && CometSparkSessionExtensions.isSpark40Plus))
22732279
withTempPath { dir =>
22742280
// create input file with Comet disabled
22752281
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
@@ -2291,7 +2297,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
22912297
val df = spark.read.parquet(dir.toString()).select("nested1.id")
22922298
// Comet's original scan does not support structs.
22932299
// The plan will have a Comet Scan only if scan impl is native_full or native_recordbatch
2294-
if (!CometConf.COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_COMET)) {
2300+
if (scanImpl != CometConf.SCAN_NATIVE_COMET) {
22952301
checkSparkAnswerAndOperator(df)
22962302
} else {
22972303
checkSparkAnswer(df)

0 commit comments

Comments
 (0)