-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Task Description
Problem
Spark 4.0.x's ParquetUnshreddedVariantConverter (in ParquetRowConverter.scala) assumes Variant group fields are always ordered [value, metadata]. When a Parquet file stores the fields in the spec-compliant order [metadata, value], Spark swaps the bytes, passing metadata bytes as the value and vice versa, resulting in MALFORMED_VARIANT.
Root Cause
In ParquetUnshreddedVariantConverter, the converter array is built in hardcoded [value, metadata] order:
val valueAndMetadata = Seq("value", "metadata").map { colName =>
val idx = (0 until parquetType.getFieldCount())
.find(parquetType.getFieldName(_) == colName)
// ...
}
Array(
newConverter(valueAndMetadata(0), BinaryType, /* sets currentValue */),
newConverter(valueAndMetadata(1), BinaryType, /* sets currentMetadata */)
)The fields are correctly looked up by name, but the resulting converters are placed at fixed array positions [0, 1]. When Parquet calls getConverter(fieldIndex), fieldIndex corresponds to the field's position in the schema, not the hardcoded array order. If the schema has [metadata(0), value(1)], then getConverter(0) returns the value converter for metadata bytes.
The same issue exists in ParquetToSparkSchemaConverter.convertVariantField, which builds ParquetColumn children in hardcoded [value, metadata] order regardless of the file schema.
Expected Behavior
The Variant converter should map converters to schema positions based on the name lookup, not assume a fixed [value, metadata] order. For example:
val converters = Array.ofDim[Converter](2)
val valueIdx = (0 until parquetType.getFieldCount()).find(parquetType.getFieldName(_) == "value").get
val metadataIdx = (0 until parquetType.getFieldCount()).find(parquetType.getFieldName(_) == "metadata").get
converters(valueIdx) = newConverter(parquetType.getType(valueIdx), BinaryType, /* sets currentValue */)
converters(metadataIdx) = newConverter(parquetType.getType(metadataIdx), BinaryType, /* sets currentMetadata */)Context
The Parquet Variant spec and Iceberg both define the field order as [metadata, value]. Spark's writer also emits [value, metadata], so the bug is currently masked. It surfaces when reading spec-compliant files produced by other engines (e.g., Apache Hudi aligning with the Parquet spec).
Workaround
Reorder the variant group fields to [value, metadata] in the requested Parquet schema before Spark sees it. Since parquet-mr reconciles the requested schema against the file schema by field name, the correct bytes flow to the correct converters despite the file having a different physical order.
Affected Version
Spark 4.0.x. Believed to be fixed in 4.1.0+ via SPARK-54410 (VARIANT logical type read support).
Task Type
The workaround is merged in here: dc94e25
Once Spark4.1+ is supported in Hudi, and parquet-java 1.16.0 is added, we can remove this workaround.
The test to run to repro the error without the workaround is:
Test Table with Variant Data Type in org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala.
Error log:
[MALFORMED_VARIANT] Variant binary is malformed. Please check the data source is valid. SQLSTATE: 22023
org.apache.spark.SparkRuntimeException: [MALFORMED_VARIANT] Variant binary is malformed. Please check the data source is valid. SQLSTATE: 22023
at org.apache.spark.types.variant.VariantUtil.malformedVariant(VariantUtil.java:179)
at org.apache.spark.types.variant.Variant.<init>(Variant.java:62)
at org.apache.spark.types.variant.Variant.<init>(Variant.java:53)
at org.apache.spark.sql.catalyst.expressions.variant.VariantGet$.cast(variantExpressions.scala:410)
at org.apache.spark.sql.catalyst.expressions.variant.VariantGet.cast(variantExpressions.scala)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:41)
at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:342)
at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:340)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:918)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:918)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
at org.apache.spark.scheduler.Task.run(Task.scala:147)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:340)
at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:207)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:363)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:249)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:243)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:217)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:213)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$triggerFuture$3(ShuffleExchangeExec.scala:97)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$triggerFuture$1(ShuffleExchangeExec.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Related Issues
Parent feature issue: (if applicable )
Related issues:
NOTE: Use Relationships button to add parent/blocking issues after issue is created.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status