Skip to content

Commit 17a36bc

Browse files
authored
feat: Add experimental auto mode for COMET_PARQUET_SCAN_IMPL (#1747)
1 parent 0b4d75e commit 17a36bc

File tree

9 files changed

+233
-58
lines changed

9 files changed

+233
-58
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
name: Spark SQL Tests (native_auto)
19+
20+
concurrency:
21+
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
22+
cancel-in-progress: true
23+
24+
on:
25+
# manual trigger
26+
# https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
27+
workflow_dispatch:
28+
29+
env:
30+
RUST_VERSION: stable
31+
32+
jobs:
33+
spark-sql-catalyst-native-auto:
34+
strategy:
35+
matrix:
36+
os: [ubuntu-24.04]
37+
java-version: [11]
38+
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.5'}]
39+
module:
40+
- {name: "catalyst", args1: "catalyst/test", args2: ""}
41+
- {name: "sql/core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest}
42+
- {name: "sql/core-2", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.ExtendedSQLTest"}
43+
- {name: "sql/core-3", args1: "", args2: "sql/testOnly * -- -n org.apache.spark.tags.SlowSQLTest"}
44+
- {name: "sql/hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
45+
- {name: "sql/hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"}
46+
- {name: "sql/hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"}
47+
fail-fast: false
48+
name: spark-sql-native-auto-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.java-version }}
49+
runs-on: ${{ matrix.os }}
50+
container:
51+
image: amd64/rust
52+
steps:
53+
- uses: actions/checkout@v4
54+
- name: Setup Rust & Java toolchain
55+
uses: ./.github/actions/setup-builder
56+
with:
57+
rust-version: ${{env.RUST_VERSION}}
58+
jdk-version: ${{ matrix.java-version }}
59+
- name: Setup Spark
60+
uses: ./.github/actions/setup-spark-builder
61+
with:
62+
spark-version: ${{ matrix.spark-version.full }}
63+
spark-short-version: ${{ matrix.spark-version.short }}
64+
- name: Run Spark tests
65+
run: |
66+
cd apache-spark
67+
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
68+
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=true COMET_PARQUET_SCAN_IMPL=auto build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
69+
env:
70+
LC_ALL: "C.UTF-8"
71+

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ object CometConf extends ShimCometConf {
8686
val SCAN_NATIVE_COMET = "native_comet"
8787
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
8888
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
89+
val SCAN_AUTO = "auto"
8990

9091
val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
9192
.doc(
@@ -95,11 +96,12 @@ object CometConf extends ShimCometConf {
9596
"parquet file reader and native column decoding. Supports simple types only " +
9697
s"'$SCAN_NATIVE_DATAFUSION' is a fully native implementation of scan based on DataFusion" +
9798
s"'$SCAN_NATIVE_ICEBERG_COMPAT' is a native implementation that exposes apis to read " +
98-
"parquet columns natively.")
99+
s"parquet columns natively. $SCAN_AUTO chooses the best scan.")
99100
.internal()
100101
.stringConf
101102
.transform(_.toLowerCase(Locale.ROOT))
102-
.checkValues(Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT))
103+
.checkValues(
104+
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
103105
.createWithDefault(sys.env
104106
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
105107
.toLowerCase(Locale.ROOT))
@@ -587,8 +589,8 @@ object CometConf extends ShimCometConf {
587589
val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
588590
conf("spark.comet.scan.allowIncompatible")
589591
.doc(
590-
"Comet is not currently fully compatible with Spark for all datatypes. " +
591-
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
592+
"Some Comet scan implementations are not currently fully compatible with Spark for " +
593+
s"all datatypes. Set this config to true to allow them anyway. $COMPAT_GUIDE.")
592594
.booleanConf
593595
.createWithDefault(false)
594596

docs/source/user-guide/compatibility.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ implementation:
5050

5151
The new scans currently have the following limitations:
5252

53+
Issues common to both `native_datafusion` and `native_iceberg_compat`:
54+
5355
- When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8`
5456
or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these
5557
logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned
@@ -58,12 +60,21 @@ types (regardless of the logical type). This behavior can be disabled by setting
5860
`spark.comet.scan.allowIncompatible=true`.
5961
- There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more
6062
information.
63+
- Reading maps containing complex types can result in errors or incorrect results [#1754]
64+
- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758]
6165
- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]).
6266
- No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported.
6367
- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan.
6468

69+
Issues specific to `native_datafusion`:
70+
71+
- Bucketed scans are not supported
72+
- No support for row indexes
73+
6574
[#1545]: https://github.com/apache/datafusion-comet/issues/1545
6675
[#1542]: https://github.com/apache/datafusion-comet/issues/1542
76+
[#1754]: https://github.com/apache/datafusion-comet/issues/1754
77+
[#1758]: https://github.com/apache/datafusion-comet/issues/1758
6778
[Comet Tuning Guide]: tuning.md
6879

6980
## ANSI mode

docs/source/user-guide/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ Comet provides the following configuration settings.
8383
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
8484
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
8585
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
86-
| spark.comet.scan.allowIncompatible | Comet is not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
86+
| spark.comet.scan.allowIncompatible | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
8787
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |
8888
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false |
8989
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |

docs/templates/compatibility-template.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ implementation:
5050

5151
The new scans currently have the following limitations:
5252

53+
Issues common to both `native_datafusion` and `native_iceberg_compat`:
54+
5355
- When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8`
5456
or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these
5557
logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned
@@ -58,12 +60,21 @@ The new scans currently have the following limitations:
5860
`spark.comet.scan.allowIncompatible=true`.
5961
- There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more
6062
information.
63+
- Reading maps containing complex types can result in errors or incorrect results [#1754]
64+
- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758]
6165
- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]).
6266
- No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported.
6367
- Setting Spark configs `ignoreMissingFiles` or `ignoreCorruptFiles` to `true` is not compatible with `native_datafusion` scan.
6468

69+
Issues specific to `native_datafusion`:
70+
71+
- Bucketed scans are not supported
72+
- No support for row indexes
73+
6574
[#1545]: https://github.com/apache/datafusion-comet/issues/1545
6675
[#1542]: https://github.com/apache/datafusion-comet/issues/1542
76+
[#1754]: https://github.com/apache/datafusion-comet/issues/1754
77+
[#1758]: https://github.com/apache/datafusion-comet/issues/1758
6778
[Comet Tuning Guide]: tuning.md
6879

6980
## ANSI mode

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,14 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
105105
return withInfos(scanExec, fallbackReasons.toSet)
106106
}
107107

108-
val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
109-
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
108+
var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
109+
110+
// if scan is auto then pick the best available scan
111+
if (scanImpl == SCAN_AUTO) {
112+
scanImpl = selectScan(scanExec, r.partitionSchema)
113+
}
114+
115+
if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
110116
fallbackReasons +=
111117
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled"
112118
return withInfos(scanExec, fallbackReasons.toSet)
@@ -251,6 +257,57 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
251257
}
252258
}
253259

260+
private def selectScan(scanExec: FileSourceScanExec, partitionSchema: StructType): String = {
261+
// TODO these checks are not yet exhaustive. For example, native_iceberg_compat does
262+
// not support reading from S3
263+
264+
val fallbackReasons = new ListBuffer[String]()
265+
266+
val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT)
267+
val schemaSupported =
268+
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
269+
val partitionSchemaSupported =
270+
typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
271+
272+
def isComplexType(dt: DataType): Boolean = dt match {
273+
case _: StructType | _: ArrayType | _: MapType => true
274+
case _ => false
275+
}
276+
277+
def hasMapsContainingStructs(dataType: DataType): Boolean = {
278+
dataType match {
279+
case s: StructType => s.exists(field => hasMapsContainingStructs(field.dataType))
280+
case a: ArrayType => hasMapsContainingStructs(a.elementType)
281+
case m: MapType => isComplexType(m.keyType) || isComplexType(m.valueType)
282+
case _ => false
283+
}
284+
}
285+
286+
val knownIssues =
287+
scanExec.requiredSchema.exists(field => hasMapsContainingStructs(field.dataType)) ||
288+
partitionSchema.exists(field => hasMapsContainingStructs(field.dataType))
289+
290+
if (knownIssues) {
291+
fallbackReasons += "There are known issues with maps containing structs when using " +
292+
s"$SCAN_NATIVE_ICEBERG_COMPAT"
293+
}
294+
295+
val cometExecEnabled = COMET_EXEC_ENABLED.get()
296+
if (!cometExecEnabled) {
297+
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true"
298+
}
299+
300+
if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues) {
301+
logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT")
302+
SCAN_NATIVE_ICEBERG_COMPAT
303+
} else {
304+
logInfo(
305+
s"Auto scan mode falling back to $SCAN_NATIVE_COMET due to " +
306+
s"${fallbackReasons.mkString(", ")}")
307+
SCAN_NATIVE_COMET
308+
}
309+
}
310+
254311
}
255312

256313
case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport {

0 commit comments

Comments
 (0)