Skip to content

Commit 90a67a6

Browse files
committed
feat: add read array support
1 parent 315d6a7 commit 90a67a6

File tree

3 files changed

+75
-3
lines changed

3 files changed

+75
-3
lines changed

native/core/src/execution/serde.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType {
111111
{
112112
DatatypeStruct::List(info) => {
113113
let field = Field::new(
114-
"item",
114+
"element",
115115
to_arrow_datatype(info.element_type.as_ref().unwrap()),
116116
info.contains_null,
117117
);

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ object CometNativeScanExec extends DataTypeSupport {
182182
case null => null
183183
}
184184

185-
val newArgs = mapProductIterator(scanExec, transform(_))
185+
val newArgs = mapProductIterator(scanExec, transform)
186186
val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
187187
val batchScanExec = CometNativeScanExec(
188188
nativeOp,
@@ -202,9 +202,10 @@ object CometNativeScanExec extends DataTypeSupport {
202202
}
203203

204204
override def isAdditionallySupported(dt: DataType): Boolean = {
205-
// TODO add array and map
205+
// TODO add map
206206
dt match {
207207
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
208+
case a: ArrayType => isTypeSupported(a.elementType)
208209
case _ => false
209210
}
210211
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.exec
21+
22+
import scala.reflect.ClassTag
23+
import scala.reflect.runtime.universe._
24+
25+
import org.scalactic.source.Position
26+
import org.scalatest.Tag
27+
28+
import org.apache.hadoop.fs.Path
29+
import org.apache.spark.sql.{CometTestBase, DataFrame, Encoders}
30+
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
31+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
32+
import org.apache.spark.sql.functions.col
33+
import org.apache.spark.sql.internal.SQLConf
34+
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
35+
36+
import org.apache.comet.CometConf
37+
import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
38+
39+
class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper {
40+
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
41+
pos: Position): Unit = {
42+
super.test(testName, testTags: _*) {
43+
withSQLConf(
44+
CometConf.COMET_EXEC_ENABLED.key -> "true",
45+
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
46+
CometConf.COMET_ENABLED.key -> "true",
47+
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
48+
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
49+
testFun
50+
}
51+
}
52+
}
53+
54+
test("native reader - read simple struct fields") {
55+
testSingleLineQuery(
56+
"""
57+
|select named_struct('firstName', 'John', 'lastName', 'Doe', 'age', 35) as personal_info union all
58+
|select named_struct('firstName', 'Jane', 'lastName', 'Doe', 'age', 40) as personal_info
59+
|""".stripMargin,
60+
"select personal_info.* from tbl")
61+
}
62+
63+
test("native reader - read simple array fields") {
64+
testSingleLineQuery(
65+
"""
66+
|select array(1, 2, 3) as arr union all
67+
|select array(2, 3, 4) as arr
68+
|""".stripMargin,
69+
"select arr from tbl")
70+
}
71+
}

0 commit comments

Comments
 (0)