Skip to content

Commit 1a000ab

Browse files
andygroveclaude
andauthored
refactor: rename scan.allowIncompatible to scan.unsignedSmallIntSafetyCheck (apache#3238)
* refactor: rename scan.allowIncompatible to scan.unsignedSmallIntSafetyCheck This change renames `spark.comet.scan.allowIncompatible` to `spark.comet.scan.unsignedSmallIntSafetyCheck` and changes its default to `true` (enabled by default). The key change is that ByteType is removed from the safety check entirely, leaving only ShortType subject to fallback behavior. ## Why ByteType is Safe ByteType columns are always safe for native execution because: 1. **Parquet type mapping**: Spark's ByteType can only originate from signed INT8 in Parquet. There is no unsigned 8-bit Parquet type (UINT_8) that maps to ByteType. 2. **UINT_8 maps to ShortType**: When Parquet files contain unsigned UINT_8 columns, Spark maps them to ShortType (16-bit), not ByteType. This is because UINT_8 values (0-255) exceed the signed byte range (-128 to 127). 3. **Truncation preserves signed values**: When storing signed INT8 in 8 bits, the truncation from any wider representation preserves the correct signed value due to two's complement representation. ## Why ShortType Needs the Safety Check ShortType columns may be problematic because: 1. **Ambiguous origin**: ShortType can come from either signed INT16 (safe) or unsigned UINT_8 (potentially incompatible). 2. **Different reader behavior**: Arrow-based readers like DataFusion respect the unsigned UINT_8 logical type and read data as unsigned, while Spark ignores the logical type and reads as signed. This can produce different results for values 128-255. 3. **No metadata available**: At scan time, Comet cannot determine whether a ShortType column originated from INT16 or UINT_8, so the safety check conservatively falls back to Spark for all ShortType columns. Users who know their data does not contain unsigned UINT_8 columns can disable the safety check with `spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Co-Authored-By: Claude Opus 4.5 <[email protected]> * format * rename * rename * Fix clippy warnings for Rust 1.93 - Use local `root_op` variable instead of unwrapping `exec_context.root_op` - Replace `is_some()` + `unwrap()` pattern with `if let Some(...)` Co-Authored-By: Claude Opus 4.5 <[email protected]> --------- Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 1b75777 commit 1a000ab

File tree

8 files changed

+37
-29
lines changed

8 files changed

+37
-29
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -766,13 +766,18 @@ object CometConf extends ShimCometConf {
766766
.booleanConf
767767
.createWithDefault(false)
768768

769-
val COMET_SCAN_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
770-
conf("spark.comet.scan.allowIncompatible")
769+
val COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK: ConfigEntry[Boolean] =
770+
conf("spark.comet.scan.unsignedSmallIntSafetyCheck")
771771
.category(CATEGORY_SCAN)
772-
.doc("Some Comet scan implementations are not currently fully compatible with Spark for " +
773-
s"all datatypes. Set this config to true to allow them anyway. $COMPAT_GUIDE.")
772+
.doc(
773+
"Parquet files may contain unsigned 8-bit integers (UINT_8) which Spark maps to " +
774+
"ShortType. When this config is true (default), Comet falls back to Spark for " +
775+
"ShortType columns because we cannot distinguish signed INT16 (safe) from unsigned " +
776+
"UINT_8 (may produce different results). Set to false to allow native execution of " +
777+
"ShortType columns if you know your data does not contain unsigned UINT_8 columns " +
778+
s"from improperly encoded Parquet files. $COMPAT_GUIDE.")
774779
.booleanConf
775-
.createWithDefault(false)
780+
.createWithDefault(true)
776781

777782
val COMET_EXEC_STRICT_FLOATING_POINT: ConfigEntry[Boolean] =
778783
conf("spark.comet.exec.strictFloatingPoint")

docs/source/contributor-guide/parquet_scans.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ implementation:
4242

4343
The `native_datafusion` and `native_iceberg_compat` scans share the following limitations:
4444

45-
- When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8`
46-
or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these
47-
logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned
48-
rather than signed. By default, Comet will fall back to Spark's native scan when scanning Parquet files containing
49-
`byte` or `short` types (regardless of the logical type). This behavior can be disabled by setting
50-
`spark.comet.scan.allowIncompatible=true`.
45+
- When reading Parquet files written by systems other than Spark that contain columns with the logical type `UINT_8`
46+
(unsigned 8-bit integers), Comet may produce different results than Spark. Spark maps `UINT_8` to `ShortType`, but
47+
Comet's Arrow-based readers respect the unsigned type and read the data as unsigned rather than signed. Since Comet
48+
cannot distinguish `ShortType` columns that came from `UINT_8` versus signed `INT16`, by default Comet falls back to
49+
Spark when scanning Parquet files containing `ShortType` columns. This behavior can be disabled by setting
50+
`spark.comet.scan.unsignedSmallIntSafetyCheck=false`. Note that `ByteType` columns are always safe because they can
51+
only come from signed `INT8`, where truncation preserves the signed value.
5152
- No support for default values that are nested types (e.g., maps, arrays, structs). Literal default values are supported.
5253

5354
The `native_datafusion` scan has some additional limitations:

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -721,11 +721,13 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
721721
name: String,
722722
fallbackReasons: ListBuffer[String]): Boolean = {
723723
dt match {
724-
case ByteType | ShortType
724+
case ShortType
725725
if scanImpl != CometConf.SCAN_NATIVE_COMET &&
726-
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get() =>
727-
fallbackReasons += s"$scanImpl scan cannot read $dt when " +
728-
s"${CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key} is false. ${CometConf.COMPAT_GUIDE}."
726+
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
727+
fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " +
728+
s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " +
729+
"native execution if your data does not contain unsigned small integers. " +
730+
CometConf.COMPAT_GUIDE
729731
false
730732
case _: StructType | _: ArrayType | _: MapType if scanImpl == CometConf.SCAN_NATIVE_COMET =>
731733
false

spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper {
120120
super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) {
121121
withSQLConf(
122122
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl,
123-
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true",
123+
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
124124
CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
125125
testFun
126126
}

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,8 @@ class CometParquetWriterSuite extends CometTestBase {
459459
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
460460
// explicitly set scan impl to override CI defaults
461461
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "auto",
462-
// COMET_SCAN_ALLOW_INCOMPATIBLE is needed because input data contains byte/short types
463-
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true",
462+
// Disable unsigned small int safety check for ShortType columns
463+
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "false",
464464
// use a different timezone to make sure that timezone handling works with nested types
465465
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax") {
466466

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1904,7 +1904,7 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {
19041904
val rows = 1000
19051905
withSQLConf(
19061906
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
1907-
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
1907+
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") {
19081908
makeParquetFileAllPrimitiveTypes(
19091909
path,
19101910
dictionaryEnabled = false,

spark/src/test/scala/org/apache/comet/rules/CometScanRuleSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,21 +140,21 @@ class CometScanRuleSuite extends CometTestBase {
140140
}
141141
}
142142

143-
test("CometScanRule should fallback to Spark for unsupported data types in v1 scan") {
143+
test("CometScanRule should fallback to Spark for ShortType when safety check enabled") {
144144
withTempPath { path =>
145-
// Create test data with unsupported types (e.g., BinaryType, CalendarIntervalType)
145+
// Create test data with ShortType which may be from unsigned UINT_8
146146
import org.apache.spark.sql.types._
147147
val unsupportedSchema = new StructType(
148148
Array(
149149
StructField("id", DataTypes.IntegerType, nullable = true),
150150
StructField(
151151
"value",
152-
DataTypes.ByteType,
152+
DataTypes.ShortType,
153153
nullable = true
154-
), // Unsupported in some scan modes
154+
), // May be from unsigned UINT_8
155155
StructField("name", DataTypes.StringType, nullable = true)))
156156

157-
val testData = Seq(Row(1, 1.toByte, "test1"), Row(2, -1.toByte, "test2"))
157+
val testData = Seq(Row(1, 1.toShort, "test1"), Row(2, -1.toShort, "test2"))
158158

159159
val df = spark.createDataFrame(spark.sparkContext.parallelize(testData), unsupportedSchema)
160160
df.write.parquet(path.toString)
@@ -167,10 +167,10 @@ class CometScanRuleSuite extends CometTestBase {
167167

168168
withSQLConf(
169169
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_ICEBERG_COMPAT,
170-
CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false") {
170+
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") {
171171
val transformedPlan = applyCometScanRule(sparkPlan)
172172

173-
// Should fallback to Spark due to unsupported ByteType in schema
173+
// Should fallback to Spark due to ShortType (may be from unsigned UINT_8)
174174
assert(countOperators(transformedPlan, classOf[FileSourceScanExec]) == 1)
175175
assert(countOperators(transformedPlan, classOf[CometScanExec]) == 0)
176176
}

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ abstract class CometTestBase
8383
conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true")
8484
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
8585
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
86-
conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
86+
conf.set(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key, "false")
8787
conf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "2g")
8888
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
8989
// SortOrder is incompatible for mixed zero and negative zero floating point values, but
@@ -1113,7 +1113,7 @@ abstract class CometTestBase
11131113
* |""".stripMargin,
11141114
* "select arr from tbl",
11151115
* sqlConf = Seq(
1116-
* CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "false",
1116+
* CometConf.COMET_SCAN_UNSIGNED_SMALL_INT_SAFETY_CHECK.key -> "true",
11171117
* "spark.comet.explainFallback.enabled" -> "false"
11181118
* ),
11191119
* debugCometDF = df => {
@@ -1275,6 +1275,6 @@ abstract class CometTestBase
12751275

12761276
def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = {
12771277
usingDataSourceExec(conf) &&
1278-
!CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.get(conf)
1278+
CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf)
12791279
}
12801280
}

0 commit comments

Comments
 (0)