Skip to content

Commit 2665b52

Browse files
authored
feat: Support Type widening: byte → short/int/long, short → int/long (#1770)
* Support type widening for Spark 4.0 * formatting * add one more test * formatting
1 parent 38beead commit 2665b52

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

native/core/src/parquet/read/column.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,27 @@ impl ColumnReader {
154154
)
155155
}
156156
}
157+
// promote byte to short
158+
PhysicalType::INT32 if promotion_info.bit_width == 16 => {
159+
typed_reader!(Int16ColumnReader, Int16)
160+
}
161+
// promote byte to int
162+
PhysicalType::INT32 if promotion_info.bit_width == 32 => {
163+
typed_reader!(Int32ColumnReader, Int32)
164+
}
165+
// promote byte to long
166+
PhysicalType::INT64 => typed_reader!(Int32To64ColumnReader, Int64),
157167
_ => typed_reader!(Int8ColumnReader, Int8),
158168
},
159169
(8, false) => typed_reader!(UInt8ColumnReader, Int16),
160170
(16, true) => match promotion_info.physical_type {
161171
PhysicalType::DOUBLE => {
162172
typed_reader!(Int16ToDoubleColumnReader, Float64)
163173
}
174+
// promote short to long
175+
PhysicalType::INT64 => {
176+
typed_reader!(Int32To64ColumnReader, Int64)
177+
}
164178
PhysicalType::INT32 if promotion_info.bit_width == 32 => {
165179
typed_reader!(Int32ColumnReader, Int32)
166180
}

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,88 @@ abstract class ParquetReadSuite extends CometTestBase {
12461246
}
12471247
}
12481248

1249+
test("type widening: byte → short/int/long, short → int/long, int → long") {
1250+
withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
1251+
withTempPath { dir =>
1252+
val path = dir.getCanonicalPath
1253+
val values = 1 to 10
1254+
val options: Map[String, String] = Map.empty[String, String]
1255+
1256+
// Input types and corresponding DataFrames
1257+
val inputDFs = Seq(
1258+
"byte" -> values.map(_.toByte).toDF("col1"),
1259+
"short" -> values.map(_.toShort).toDF("col1"),
1260+
"int" -> values.map(_.toInt).toDF("col1"))
1261+
1262+
// Target Spark read schemas for widening
1263+
val widenTargets = Seq(
1264+
"short" -> values.map(_.toShort).toDF("col1"),
1265+
"int" -> values.map(_.toInt).toDF("col1"),
1266+
"long" -> values.map(_.toLong).toDF("col1"))
1267+
1268+
for ((inputType, inputDF) <- inputDFs) {
1269+
val writePath = s"$path/$inputType"
1270+
inputDF.write.format("parquet").options(options).save(writePath)
1271+
1272+
for ((targetType, targetDF) <- widenTargets) {
1273+
// Only test valid widenings (e.g., don't test int → short)
1274+
val wideningValid = (inputType, targetType) match {
1275+
case ("byte", "short" | "int" | "long") => true
1276+
case ("short", "int" | "long") => true
1277+
case ("int", "long") => true
1278+
case _ => false
1279+
}
1280+
1281+
if (wideningValid) {
1282+
val reader = spark.read
1283+
.schema(s"col1 $targetType")
1284+
.format("parquet")
1285+
.options(options)
1286+
.load(writePath)
1287+
1288+
checkAnswer(reader, targetDF)
1289+
}
1290+
}
1291+
}
1292+
}
1293+
}
1294+
}
1295+
1296+
test("read byte, int, short, long together") {
1297+
withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
1298+
withTempPath { dir =>
1299+
val path = dir.getCanonicalPath
1300+
1301+
val byteDF = (Byte.MaxValue - 2 to Byte.MaxValue).map(_.toByte).toDF("col1")
1302+
val shortDF = (Short.MaxValue - 2 to Short.MaxValue).map(_.toShort).toDF("col1")
1303+
val intDF = (Int.MaxValue - 2 to Int.MaxValue).toDF("col1")
1304+
val longDF = (Long.MaxValue - 2 to Long.MaxValue).toDF("col1")
1305+
val unionDF = byteDF.union(shortDF).union(intDF).union(longDF)
1306+
1307+
val byteDir = s"$path${File.separator}part=byte"
1308+
val shortDir = s"$path${File.separator}part=short"
1309+
val intDir = s"$path${File.separator}part=int"
1310+
val longDir = s"$path${File.separator}part=long"
1311+
1312+
val options: Map[String, String] = Map.empty[String, String]
1313+
1314+
byteDF.write.format("parquet").options(options).save(byteDir)
1315+
shortDF.write.format("parquet").options(options).save(shortDir)
1316+
intDF.write.format("parquet").options(options).save(intDir)
1317+
longDF.write.format("parquet").options(options).save(longDir)
1318+
1319+
val df = spark.read
1320+
.schema(unionDF.schema)
1321+
.format("parquet")
1322+
.options(options)
1323+
.load(path)
1324+
.select("col1")
1325+
1326+
checkAnswer(df, unionDF)
1327+
}
1328+
}
1329+
}
1330+
12491331
test("scan metrics") {
12501332
// https://github.com/apache/datafusion-comet/issues/1441
12511333
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT)

0 commit comments

Comments
 (0)