Skip to content

Commit 6361bcd

Browse files
authored
fix: correctly handle schemas with nested array of struct (native_iceberg_compat) (apache#1883)
* fix: correctly handle schemas with nested array of struct
1 parent 17a36bc commit 6361bcd

File tree

2 files changed

+81
-3
lines changed

2 files changed

+81
-3
lines changed

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -533,13 +533,20 @@ private StructType getSparkSchemaByFieldId(
533533
return newSchema;
534534
}
535535

536+
private static boolean isPrimitiveCatalystType(DataType dataType) {
537+
return !(dataType instanceof ArrayType)
538+
&& !(dataType instanceof MapType)
539+
&& !(dataType instanceof StructType);
540+
}
541+
536542
private DataType getSparkTypeByFieldId(
537543
DataType dataType, Type parquetType, boolean caseSensitive) {
538544
DataType newDataType;
539545
if (dataType instanceof StructType) {
540546
newDataType =
541547
getSparkSchemaByFieldId((StructType) dataType, parquetType.asGroupType(), caseSensitive);
542-
} else if (dataType instanceof ArrayType) {
548+
} else if (dataType instanceof ArrayType
549+
&& !isPrimitiveCatalystType(((ArrayType) dataType).elementType())) {
543550

544551
newDataType =
545552
getSparkArrayTypeByFieldId(
@@ -575,11 +582,10 @@ private DataType getSparkTypeByFieldId(
575582
}
576583

577584
private DataType getSparkArrayTypeByFieldId(
578-
ArrayType arrayType, GroupType parquetType, boolean caseSensitive) {
585+
ArrayType arrayType, GroupType parquetList, boolean caseSensitive) {
579586
DataType newDataType;
580587
DataType elementType = arrayType.elementType();
581588
DataType newElementType;
582-
Type parquetList = parquetType.getFields().get(0);
583589
Type parquetElementType;
584590
if (parquetList.getLogicalTypeAnnotation() == null
585591
&& parquetList.isRepetition(Type.Repetition.REPEATED)) {

spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
4141
import org.apache.spark.sql.catalyst.util.DateTimeUtils
4242
import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec}
4343
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
44+
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
4445
import org.apache.spark.sql.internal.SQLConf
4546
import org.apache.spark.sql.types._
4647
import org.apache.spark.unsafe.types.UTF8String
@@ -1745,6 +1746,77 @@ abstract class ParquetReadSuite extends CometTestBase {
17451746
}
17461747
}
17471748
}
1749+
1750+
private def withId(id: Int) =
1751+
new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
1752+
1753+
// Based on Spark ParquetIOSuite.test("vectorized reader: array of nested struct")
1754+
test("array of nested struct with and without field id") {
1755+
val nestedSchema = StructType(
1756+
Seq(StructField(
1757+
"_1",
1758+
StructType(Seq(
1759+
StructField("_1", StringType, nullable = true, withId(1)), // Field ID 1
1760+
StructField(
1761+
"_2",
1762+
ArrayType(StructType(Seq(
1763+
StructField("_1", StringType, nullable = true, withId(2)), // Field ID 2
1764+
StructField("_2", StringType, nullable = true, withId(3)) // Field ID 3
1765+
))),
1766+
nullable = true))),
1767+
nullable = true)))
1768+
val nestedSchemaNoId = StructType(
1769+
Seq(StructField(
1770+
"_1",
1771+
StructType(Seq(
1772+
StructField("_1", StringType, nullable = true),
1773+
StructField(
1774+
"_2",
1775+
ArrayType(StructType(Seq(
1776+
StructField("_1", StringType, nullable = true),
1777+
StructField("_2", StringType, nullable = true)))),
1778+
nullable = true))),
1779+
nullable = true)))
1780+
// data matching the schema
1781+
val data = Seq(
1782+
Row(Row("a", null)),
1783+
Row(Row("b", Seq(Row("c", "d")))),
1784+
Row(null),
1785+
Row(Row("e", Seq(Row("f", null), Row(null, "g")))),
1786+
Row(Row(null, null)),
1787+
Row(Row(null, Seq(null))),
1788+
Row(Row(null, Seq(Row(null, null), Row("h", null), null))),
1789+
Row(Row("i", Seq())),
1790+
Row(null))
1791+
val answer =
1792+
Row(Row("a", null)) ::
1793+
Row(Row("b", Seq(Row("c", "d")))) ::
1794+
Row(null) ::
1795+
Row(Row("e", Seq(Row("f", null), Row(null, "g")))) ::
1796+
Row(Row(null, null)) ::
1797+
Row(Row(null, Seq(null))) ::
1798+
Row(Row(null, Seq(Row(null, null), Row("h", null), null))) ::
1799+
Row(Row("i", Seq())) ::
1800+
Row(null) ::
1801+
Nil
1802+
1803+
withSQLConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") {
1804+
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), nestedSchema)
1805+
withTempPath { path =>
1806+
df.write.parquet(path.getCanonicalPath)
1807+
readParquetFile(path.getCanonicalPath) { df =>
1808+
checkAnswer(df, answer)
1809+
}
1810+
}
1811+
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(data), nestedSchemaNoId)
1812+
withTempPath { path =>
1813+
df2.write.parquet(path.getCanonicalPath)
1814+
readParquetFile(path.getCanonicalPath) { df =>
1815+
checkAnswer(df, answer)
1816+
}
1817+
}
1818+
}
1819+
}
17481820
}
17491821

17501822
class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {

0 commit comments

Comments
 (0)