Skip to content

Commit 0e9423a

Browse files
committed
feat: enable iceberg compat tests, more tests for complex types
1 parent 9ef8075 commit 0e9423a

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2723,6 +2723,9 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
27232723
if isCometSink(op) && op.output.forall(a =>
27242724
supportedDataType(
27252725
a.dataType,
2726+
// Complex type supported if
2727+
// - Native datafusion reader enabled (experimental) OR
2728+
// - conversion from Parquet/JSON enabled
27262729
allowComplex =
27272730
usingDataFusionParquetExec(conf) || CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED
27282731
.get(conf) || CometConf.COMET_CONVERT_FROM_JSON_ENABLED.get(conf))) =>

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.spark.unsafe.types.UTF8String
4646
import com.google.common.primitives.UnsignedLong
4747

4848
import org.apache.comet.{CometConf, CometSparkSessionExtensions}
49+
import org.apache.comet.CometConf.SCAN_NATIVE_ICEBERG_COMPAT
4950
import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, usingDataFusionParquetExec}
5051

5152
abstract class ParquetReadSuite extends CometTestBase {
@@ -121,13 +122,24 @@ abstract class ParquetReadSuite extends CometTestBase {
121122
}
122123

123124
test("unsupported Spark schema") {
124-
Seq(
125-
Seq(StructField("f1", IntegerType), StructField("f2", BooleanType)) -> true,
126-
Seq(StructField("f1", IntegerType), StructField("f2", ArrayType(IntegerType))) -> false,
127-
Seq(
128-
StructField("f1", MapType(keyType = LongType, valueType = StringType)),
129-
StructField("f2", ArrayType(DoubleType))) -> false).foreach { case (schema, expected) =>
125+
val schemaDDLs =
126+
Seq("f1 int, f2 boolean", "f1 int, f2 array<int>", "f1 map<long, string>, f2 array<double>")
127+
.map(s => StructType.fromDDL(s))
128+
129+
// Arrays support for iceberg compat native and for Parquet V1
130+
val cometScanExecSupported =
131+
if (sys.env.get("COMET_PARQUET_SCAN_IMPL").contains(SCAN_NATIVE_ICEBERG_COMPAT) && this
132+
.isInstanceOf[ParquetReadV1Suite])
133+
Seq(true, true, false)
134+
else Seq(true, false, false)
135+
136+
val cometBatchScanExecSupported = Seq(true, false, false)
137+
138+
schemaDDLs.zip(cometScanExecSupported).foreach { case (schema, expected) =>
130139
assert(CometScanExec.isSchemaSupported(StructType(schema)) == expected)
140+
}
141+
142+
schemaDDLs.zip(cometBatchScanExecSupported).foreach { case (schema, expected) =>
131143
assert(CometBatchScanExec.isSchemaSupported(StructType(schema)) == expected)
132144
}
133145
}

0 commit comments

Comments
 (0)