Skip to content

Commit ba53a7f

Browse files
authored
feat: add MAP type support for first level (#1603)
* feat: add MAP type support for first level
1 parent 4740e94 commit ba53a7f

File tree

6 files changed

+20
-8
lines changed

6 files changed

+20
-8
lines changed

common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,8 @@ object Utils {
277277
case v @ (_: BitVector | _: TinyIntVector | _: SmallIntVector | _: IntVector |
278278
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
279279
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
280-
_: FixedSizeBinaryVector | _: TimeStampMicroVector | _: StructVector | _: ListVector) =>
280+
_: FixedSizeBinaryVector | _: TimeStampMicroVector | _: StructVector | _: ListVector |
281+
_: MapVector) =>
281282
v.asInstanceOf[FieldVector]
282283
case _ =>
283284
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ object QueryPlanSerde extends Logging with CometExprShim {
7070
s.fields.map(_.dataType).forall(supportedDataType(_, allowComplex))
7171
case a: ArrayType if allowComplex =>
7272
supportedDataType(a.elementType, allowComplex)
73+
case m: MapType if allowComplex =>
74+
supportedDataType(m.keyType, allowComplex) && supportedDataType(m.valueType, allowComplex)
7375
case dt =>
7476
emitWarning(s"unsupported Spark data type: $dt")
7577
false

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,10 @@ object CometNativeScanExec extends DataTypeSupport {
230230
}
231231

232232
override def isAdditionallySupported(dt: DataType): Boolean = {
233-
// TODO add map
234233
dt match {
235234
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
236235
case a: ArrayType => isTypeSupported(a.elementType)
236+
case m: MapType => isTypeSupported(m.keyType) && isTypeSupported(m.valueType)
237237
case _ => false
238238
}
239239
}

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,10 +480,10 @@ object CometScanExec extends DataTypeSupport {
480480

481481
override def isAdditionallySupported(dt: DataType): Boolean = {
482482
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
483-
// TODO add map
484483
dt match {
485484
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
486485
case a: ArrayType => isTypeSupported(a.elementType)
486+
case m: MapType => isTypeSupported(m.keyType) && isTypeSupported(m.valueType)
487487
case _ => false
488488
}
489489
} else {

spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,13 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper
134134
|""".stripMargin,
135135
"select c0 from tbl")
136136
}
137+
138+
test("native reader - read simple MAP fields") {
139+
testSingleLineQuery(
140+
"""
141+
|select map('a', 1) as c0 union all
142+
|select map('b', 2)
143+
|""".stripMargin,
144+
"select c0 from tbl")
145+
}
137146
}

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,19 @@ abstract class ParquetReadSuite extends CometTestBase {
107107
Seq(
108108
StructField("f1", DecimalType.SYSTEM_DEFAULT),
109109
StructField("f2", StringType))) -> usingNativeIcebergCompat,
110-
MapType(keyType = LongType, valueType = DateType) -> false,
110+
MapType(keyType = LongType, valueType = DateType) -> usingNativeIcebergCompat,
111111
StructType(
112112
Seq(
113113
StructField("f1", ByteType),
114114
StructField("f2", StringType))) -> usingNativeIcebergCompat,
115-
MapType(keyType = IntegerType, valueType = BinaryType) -> false).foreach {
116-
case (dt, expected) =>
115+
MapType(keyType = IntegerType, valueType = BinaryType) -> usingNativeIcebergCompat)
116+
.foreach { case (dt, expected) =>
117117
assert(CometScanExec.isTypeSupported(dt) == expected)
118118
// usingDataFusionParquetExec does not support CometBatchScanExec yet
119119
if (!usingDataFusionParquetExec(conf)) {
120120
assert(CometBatchScanExec.isTypeSupported(dt) == expected)
121121
}
122-
}
122+
}
123123
}
124124

125125
test("unsupported Spark schema") {
@@ -131,7 +131,7 @@ abstract class ParquetReadSuite extends CometTestBase {
131131
val cometScanExecSupported =
132132
if (sys.env.get("COMET_PARQUET_SCAN_IMPL").contains(SCAN_NATIVE_ICEBERG_COMPAT) && this
133133
.isInstanceOf[ParquetReadV1Suite])
134-
Seq(true, true, false)
134+
Seq(true, true, true)
135135
else Seq(true, false, false)
136136

137137
val cometBatchScanExecSupported = Seq(true, false, false)

0 commit comments

Comments
 (0)