Skip to content

Commit 3d12780

Browse files
authored
fix: Change default value of COMET_SCAN_ALLOW_INCOMPATIBLE and add documentation (#1398)
* change default value of COMET_SCAN_ALLOW_INCOMPATIBLE and add documentation * docs * rename * address feedback * rename method based on feedback
1 parent 4d63daf commit 3d12780

File tree

11 files changed

+110
-37
lines changed

11 files changed

+110
-37
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ object CometConf extends ShimCometConf {
614614
"Comet is not currently fully compatible with Spark for all datatypes. " +
615615
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
616616
.booleanConf
617-
.createWithDefault(true)
617+
.createWithDefault(false)
618618

619619
val COMET_EXPR_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
620620
conf("spark.comet.expression.allowIncompatible")

docs/source/user-guide/compatibility.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,43 @@ specific language governing permissions and limitations
1717
under the License.
1818
-->
1919

20+
<!--
21+
TO MODIFY THIS CONTENT MAKE SURE THAT YOU MAKE YOUR CHANGES TO THE TEMPLATE FILE
22+
(docs/templates/compatibility-template.md) AND NOT THE GENERATED FILE
23+
(docs/source/user-guide/compatibility.md) OTHERWISE YOUR CHANGES MAY BE LOST
24+
-->
25+
2026
# Compatibility Guide
2127

2228
Comet aims to provide consistent results with the version of Apache Spark that is being used.
2329

2430
This guide offers information about areas of functionality where there are known differences.
2531

32+
## Parquet Scans
33+
34+
Comet currently has three distinct implementations of the Parquet scan operator. The configuration property
35+
`spark.comet.scan.impl` is used to select an implementation.
36+
37+
| Implementation | Description |
38+
| ----------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
39+
| `native_comet` | This is the default implementation. It provides strong compatibility with Spark but does not support complex types. |
40+
| `native_datafusion` | This implementation delegates to DataFusion's `ParquetExec`. |
41+
| `native_iceberg_compat` | This implementation also delegates to DataFusion's `ParquetExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. |
42+
43+
The new (and currently experimental) `native_datafusion` and `native_iceberg_compat` scans are being added to
44+
provide the following benefits over the `native_comet` implementation:
45+
46+
- Leverage the DataFusion community's ongoing improvements to `ParquetExec`
47+
- Provide support for reading complex types (structs, arrays, and maps)
48+
- Remove the use of reusable mutable-buffers in Comet, which is complex to maintain
49+
50+
These new implementations are not fully implemented. Some of the current limitations are:
51+
52+
- Scanning Parquet files containing unsigned 8 or 16-bit integers can produce results that don't match Spark. By default, Comet
53+
will fall back to Spark when using these scan implementations to read Parquet files containing 8 or 16-bit integers.
54+
This behavior can be disabled by setting `spark.comet.scan.allowIncompatible=true`.
55+
- These implementations do not yet fully support timestamps, decimals, or complex types.
56+
2657
## ANSI mode
2758

2859
Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default,

docs/source/user-guide/configs.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ specific language governing permissions and limitations
1717
under the License.
1818
-->
1919

20+
<!--
21+
TO MODIFY THIS CONTENT MAKE SURE THAT YOU MAKE YOUR CHANGES TO THE TEMPLATE FILE
22+
(docs/templates/configs-template.md) AND NOT THE GENERATED FILE
23+
(docs/source/user-guide/configs.md) OTHERWISE YOUR CHANGES MAY BE LOST
24+
-->
25+
2026
# Comet Configuration Settings
2127

2228
Comet provides the following configuration settings.
@@ -76,7 +82,7 @@ Comet provides the following configuration settings.
7682
| spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true |
7783
| spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 |
7884
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
79-
| spark.comet.scan.allowIncompatible | Comet is not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | true |
85+
| spark.comet.scan.allowIncompatible | Comet is not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
8086
| spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true |
8187
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false |
8288
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |

docs/templates/compatibility-template.md

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,43 @@
1717
under the License.
1818
-->
1919

20+
<!--
21+
TO MODIFY THIS CONTENT MAKE SURE THAT YOU MAKE YOUR CHANGES TO THE TEMPLATE FILE
22+
(docs/templates/compatibility-template.md) AND NOT THE GENERATED FILE
23+
(docs/source/user-guide/compatibility.md) OTHERWISE YOUR CHANGES MAY BE LOST
24+
-->
25+
2026
# Compatibility Guide
2127

2228
Comet aims to provide consistent results with the version of Apache Spark that is being used.
2329

2430
This guide offers information about areas of functionality where there are known differences.
2531

32+
## Parquet Scans
33+
34+
Comet currently has three distinct implementations of the Parquet scan operator. The configuration property
35+
`spark.comet.scan.impl` is used to select an implementation.
36+
37+
| Implementation | Description |
38+
| ----------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
39+
| `native_comet` | This is the default implementation. It provides strong compatibility with Spark but does not support complex types. |
40+
| `native_datafusion` | This implementation delegates to DataFusion's `ParquetExec`. |
41+
| `native_iceberg_compat` | This implementation also delegates to DataFusion's `ParquetExec` but uses a hybrid approach of JVM and native code. This scan is designed to be integrated with Iceberg in the future. |
42+
43+
The new (and currently experimental) `native_datafusion` and `native_iceberg_compat` scans are being added to
44+
provide the following benefits over the `native_comet` implementation:
45+
46+
- Leverage the DataFusion community's ongoing improvements to `ParquetExec`
47+
- Provide support for reading complex types (structs, arrays, and maps)
48+
- Remove the use of reusable mutable-buffers in Comet, which is complex to maintain
49+
50+
These new implementations are not fully implemented. Some of the current limitations are:
51+
52+
- Scanning Parquet files containing unsigned 8 or 16-bit integers can produce results that don't match Spark. By default, Comet
53+
will fall back to Spark when using these scan implementations to read Parquet files containing 8 or 16-bit integers.
54+
This behavior can be disabled by setting `spark.comet.scan.allowIncompatible=true`.
55+
- These implementations do not yet fully support timestamps, decimals, or complex types.
56+
2657
## ANSI mode
2758

2859
Comet currently ignores ANSI mode in most cases, and therefore can produce different results than Spark. By default,
@@ -47,7 +78,7 @@ will fall back to Spark but can be enabled by setting `spark.comet.expression.al
4778

4879
## Array Expressions
4980

50-
Comet has experimental support for a number of array expressions. These are experimental and currently marked
81+
Comet has experimental support for a number of array expressions. These are experimental and currently marked
5182
as incompatible and can be enabled by setting `spark.comet.expression.allowIncompatible=true`.
5283

5384
## Regular Expressions
@@ -82,5 +113,5 @@ The following cast operations are not compatible with Spark for all inputs and a
82113

83114
### Unsupported Casts
84115

85-
Any cast not listed in the previous tables is currently unsupported. We are working on adding more. See the
116+
Any cast not listed in the previous tables is currently unsupported. We are working on adding more. See the
86117
[tracking issue](https://github.com/apache/datafusion-comet/issues/286) for more details.

docs/templates/configs-template.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
under the License.
1818
-->
1919

20+
<!--
21+
TO MODIFY THIS CONTENT MAKE SURE THAT YOU MAKE YOUR CHANGES TO THE TEMPLATE FILE
22+
(docs/templates/configs-template.md) AND NOT THE GENERATED FILE
23+
(docs/source/user-guide/configs.md) OTHERWISE YOUR CHANGES MAY BE LOST
24+
-->
25+
2026
# Comet Configuration Settings
2127

2228
Comet provides the following configuration settings.

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,15 +1352,11 @@ object CometSparkSessionExtensions extends Logging {
13521352
org.apache.spark.SPARK_VERSION >= "4.0"
13531353
}
13541354

1355-
def isComplexTypeReaderEnabled(conf: SQLConf): Boolean = {
1355+
def usingDataFusionParquetExec(conf: SQLConf): Boolean = {
13561356
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_ICEBERG_COMPAT ||
13571357
CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION
13581358
}
13591359

1360-
def usingDataFusionParquetReader(conf: SQLConf): Boolean = {
1361-
isComplexTypeReaderEnabled(conf) && !CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
1362-
}
1363-
13641360
/** Calculates required memory overhead in MB per executor process for Comet. */
13651361
def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
13661362
val baseMemoryMiB = if (cometUnifiedMemoryManagerEnabled(sparkConf)) {

spark/src/main/scala/org/apache/comet/DataTypeSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ trait DataTypeSupport {
3737

3838
private def isGloballySupported(dt: DataType): Boolean = dt match {
3939
case ByteType | ShortType
40-
if CometSparkSessionExtensions.isComplexTypeReaderEnabled(SQLConf.get) &&
40+
if CometSparkSessionExtensions.usingDataFusionParquetExec(SQLConf.get) &&
4141
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
4242
false
4343
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
5959

6060
private val timestampPattern = "0123456789/:T" + whitespaceChars
6161

62-
lazy val usingDataFusionParquetReader: Boolean =
63-
CometSparkSessionExtensions.usingDataFusionParquetReader(conf)
62+
lazy val usingParquetExecWithIncompatTypes: Boolean =
63+
CometSparkSessionExtensions.usingDataFusionParquetExec(conf) &&
64+
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
6465

6566
test("all valid cast combinations covered") {
6667
val names = testNames
@@ -151,71 +152,71 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
151152
castTest(
152153
generateBytes(),
153154
DataTypes.BooleanType,
154-
hasIncompatibleType = usingDataFusionParquetReader)
155+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
155156
}
156157

157158
test("cast ByteType to ShortType") {
158159
castTest(
159160
generateBytes(),
160161
DataTypes.ShortType,
161-
hasIncompatibleType = usingDataFusionParquetReader)
162+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
162163
}
163164

164165
test("cast ByteType to IntegerType") {
165166
castTest(
166167
generateBytes(),
167168
DataTypes.IntegerType,
168-
hasIncompatibleType = usingDataFusionParquetReader)
169+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
169170
}
170171

171172
test("cast ByteType to LongType") {
172173
castTest(
173174
generateBytes(),
174175
DataTypes.LongType,
175-
hasIncompatibleType = usingDataFusionParquetReader)
176+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
176177
}
177178

178179
test("cast ByteType to FloatType") {
179180
castTest(
180181
generateBytes(),
181182
DataTypes.FloatType,
182-
hasIncompatibleType = usingDataFusionParquetReader)
183+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
183184
}
184185

185186
test("cast ByteType to DoubleType") {
186187
castTest(
187188
generateBytes(),
188189
DataTypes.DoubleType,
189-
hasIncompatibleType = usingDataFusionParquetReader)
190+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
190191
}
191192

192193
test("cast ByteType to DecimalType(10,2)") {
193194
castTest(
194195
generateBytes(),
195196
DataTypes.createDecimalType(10, 2),
196-
hasIncompatibleType = usingDataFusionParquetReader)
197+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
197198
}
198199

199200
test("cast ByteType to StringType") {
200201
castTest(
201202
generateBytes(),
202203
DataTypes.StringType,
203-
hasIncompatibleType = usingDataFusionParquetReader)
204+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
204205
}
205206

206207
ignore("cast ByteType to BinaryType") {
207208
castTest(
208209
generateBytes(),
209210
DataTypes.BinaryType,
210-
hasIncompatibleType = usingDataFusionParquetReader)
211+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
211212
}
212213

213214
ignore("cast ByteType to TimestampType") {
214215
// input: -1, expected: 1969-12-31 15:59:59.0, actual: 1969-12-31 15:59:59.999999
215216
castTest(
216217
generateBytes(),
217218
DataTypes.TimestampType,
218-
hasIncompatibleType = usingDataFusionParquetReader)
219+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
219220
}
220221

221222
// CAST from ShortType
@@ -224,72 +225,72 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
224225
castTest(
225226
generateShorts(),
226227
DataTypes.BooleanType,
227-
hasIncompatibleType = usingDataFusionParquetReader)
228+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
228229
}
229230

230231
test("cast ShortType to ByteType") {
231232
// https://github.com/apache/datafusion-comet/issues/311
232233
castTest(
233234
generateShorts(),
234235
DataTypes.ByteType,
235-
hasIncompatibleType = usingDataFusionParquetReader)
236+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
236237
}
237238

238239
test("cast ShortType to IntegerType") {
239240
castTest(
240241
generateShorts(),
241242
DataTypes.IntegerType,
242-
hasIncompatibleType = usingDataFusionParquetReader)
243+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
243244
}
244245

245246
test("cast ShortType to LongType") {
246247
castTest(
247248
generateShorts(),
248249
DataTypes.LongType,
249-
hasIncompatibleType = usingDataFusionParquetReader)
250+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
250251
}
251252

252253
test("cast ShortType to FloatType") {
253254
castTest(
254255
generateShorts(),
255256
DataTypes.FloatType,
256-
hasIncompatibleType = usingDataFusionParquetReader)
257+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
257258
}
258259

259260
test("cast ShortType to DoubleType") {
260261
castTest(
261262
generateShorts(),
262263
DataTypes.DoubleType,
263-
hasIncompatibleType = usingDataFusionParquetReader)
264+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
264265
}
265266

266267
test("cast ShortType to DecimalType(10,2)") {
267268
castTest(
268269
generateShorts(),
269270
DataTypes.createDecimalType(10, 2),
270-
hasIncompatibleType = usingDataFusionParquetReader)
271+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
271272
}
272273

273274
test("cast ShortType to StringType") {
274275
castTest(
275276
generateShorts(),
276277
DataTypes.StringType,
277-
hasIncompatibleType = usingDataFusionParquetReader)
278+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
278279
}
279280

280281
ignore("cast ShortType to BinaryType") {
281282
castTest(
282283
generateShorts(),
283284
DataTypes.BinaryType,
284-
hasIncompatibleType = usingDataFusionParquetReader)
285+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
285286
}
286287

287288
ignore("cast ShortType to TimestampType") {
288289
// input: -1003, expected: 1969-12-31 15:43:17.0, actual: 1969-12-31 15:59:59.998997
289290
castTest(
290291
generateShorts(),
291292
DataTypes.TimestampType,
292-
hasIncompatibleType = usingDataFusionParquetReader)
293+
hasIncompatibleType = usingParquetExecWithIncompatTypes)
293294
}
294295

295296
// CAST from integer

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
141141
Byte.MaxValue)
142142
withParquetTable(path.toString, "tbl") {
143143
val qry = "select _9 from tbl order by _11"
144-
if (CometSparkSessionExtensions.isComplexTypeReaderEnabled(conf)) {
144+
if (CometSparkSessionExtensions.usingDataFusionParquetExec(conf)) {
145145
if (!allowIncompatible) {
146146
checkSparkAnswer(qry)
147147
} else {

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ abstract class ParquetReadSuite extends CometTestBase {
139139
i.toDouble,
140140
DateTimeUtils.toJavaDate(i))
141141
}
142-
if (!CometSparkSessionExtensions.isComplexTypeReaderEnabled(
142+
if (!CometSparkSessionExtensions.usingDataFusionParquetExec(
143143
conf) || CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get()) {
144144
checkParquetScan(data)
145145
}
@@ -162,7 +162,7 @@ abstract class ParquetReadSuite extends CometTestBase {
162162
i.toDouble,
163163
DateTimeUtils.toJavaDate(i))
164164
}
165-
if (!CometSparkSessionExtensions.isComplexTypeReaderEnabled(
165+
if (!CometSparkSessionExtensions.usingDataFusionParquetExec(
166166
conf) || CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get()) {
167167
checkParquetScan(data)
168168
}
@@ -184,7 +184,7 @@ abstract class ParquetReadSuite extends CometTestBase {
184184
DateTimeUtils.toJavaDate(i))
185185
}
186186
val filter = (row: Row) => row.getBoolean(0)
187-
if (!CometSparkSessionExtensions.isComplexTypeReaderEnabled(
187+
if (!CometSparkSessionExtensions.usingDataFusionParquetExec(
188188
conf) || CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get()) {
189189
checkParquetScan(data, filter)
190190
}

0 commit comments

Comments
 (0)