Skip to content

Commit 0699d15

Browse files
committed
feat: add read array support
1 parent 2491e23 commit 0699d15

File tree

3 files changed

+25
-18
lines changed

3 files changed

+25
-18
lines changed

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,16 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
6060
logWarning(s"Comet native execution is disabled due to: $reason")
6161
}
6262

63-
def supportedDataType(dt: DataType, allowStruct: Boolean = false): Boolean = dt match {
63+
def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match {
6464
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
6565
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: DecimalType |
6666
_: DateType | _: BooleanType | _: NullType =>
6767
true
6868
case dt if isTimestampNTZType(dt) => true
69-
case s: StructType if allowStruct =>
70-
s.fields.map(_.dataType).forall(supportedDataType(_, allowStruct))
69+
case s: StructType if allowComplex =>
70+
s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex))
71+
case a: ArrayType if allowComplex =>
72+
supportedDataType(a.elementType)
7173
case dt =>
7274
emitWarning(s"unsupported Spark data type: $dt")
7375
false
@@ -726,7 +728,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
726728
binding,
727729
(builder, binaryExpr) => builder.setLtEq(binaryExpr))
728730

729-
case Literal(value, dataType) if supportedDataType(dataType, allowStruct = value == null) =>
731+
case Literal(value, dataType)
732+
if supportedDataType(dataType, allowComplex = value == null) =>
730733
val exprBuilder = ExprOuterClass.Literal.newBuilder()
731734

732735
if (value == null) {
@@ -2666,7 +2669,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
26662669
withInfo(join, "SortMergeJoin is not enabled")
26672670
None
26682671

2669-
case op if isCometSink(op) && op.output.forall(a => supportedDataType(a.dataType, true)) =>
2672+
case op
2673+
if isCometSink(op) && op.output.forall(a =>
2674+
supportedDataType(a.dataType, allowComplex = true)) =>
26702675
// These operators are source of Comet native execution chain
26712676
val scanBuilder = OperatorOuterClass.Scan.newBuilder()
26722677
val source = op.simpleStringWithNodeId()

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,9 +486,10 @@ object CometScanExec extends DataTypeSupport {
486486

487487
override def isAdditionallySupported(dt: DataType): Boolean = {
488488
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
489-
// TODO add array and map
489+
// TODO add map
490490
dt match {
491491
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
492+
case a: ArrayType => isTypeSupported(a.elementType)
492493
case _ => false
493494
}
494495
} else {

spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,20 @@ import org.apache.comet.CometConf
3131
class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper {
3232
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
3333
pos: Position): Unit = {
34-
super.test(testName, testTags: _*) {
35-
withSQLConf(
36-
CometConf.COMET_EXEC_ENABLED.key -> "true",
37-
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
38-
CometConf.COMET_ENABLED.key -> "true",
39-
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
40-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
41-
testFun
42-
}
43-
}
34+
Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach(scan =>
35+
super.test(s"$testName - $scan", testTags: _*) {
36+
withSQLConf(
37+
CometConf.COMET_EXEC_ENABLED.key -> "true",
38+
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
39+
CometConf.COMET_ENABLED.key -> "true",
40+
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
41+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> scan) {
42+
testFun
43+
}
44+
})
4445
}
4546

46-
test("native reader - read simple struct fields") {
47+
test("native reader - read simple STRUCT fields") {
4748
testSingleLineQuery(
4849
"""
4950
|select named_struct('firstName', 'John', 'lastName', 'Doe', 'age', 35) as personal_info union all
@@ -52,7 +53,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper
5253
"select personal_info.* from tbl")
5354
}
5455

55-
test("native reader - read simple array fields") {
56+
test("native reader - read simple ARRAY fields") {
5657
testSingleLineQuery(
5758
"""
5859
|select array(1, 2, 3) as arr union all

0 commit comments

Comments
 (0)