Skip to content

Commit fb48ca6

Browse files
committed
Fix update tests
1 parent cd2f826 commit fb48ca6

File tree

2 files changed

+16
-15
lines changed

2 files changed

+16
-15
lines changed

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,7 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
215215
})
216216
}
217217

218-
// TODO: Re-enable once variant merge/update support is fully implemented #18069
219-
// Currently fails with NullPointerException in AvroDeserializer during merge operations
220-
ignore("Test Variant Shredding with Update Operation") {
218+
test("Test Variant Shredding with Update Operation") {
221219
assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher")
222220

223221
withRecordType()(withTempDir { tmp =>
@@ -277,9 +275,7 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
277275
})
278276
}
279277

280-
// TODO: Re-enable once variant merge/update support is fully implemented #18069
281-
// Currently fails with NullPointerException in AvroDeserializer during merge operations
282-
ignore("Test Variant Shredding with Merge Operation") {
278+
test("Test Variant Shredding with Merge Operation") {
283279
assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher")
284280

285281
withRecordType()(withTempDir { tmp =>

hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -237,17 +237,22 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
237237
val record = value.asInstanceOf[IndexedRecord]
238238

239239
val valueBuffer = record.get(valueIdx).asInstanceOf[ByteBuffer]
240-
val valueBytes = new Array[Byte](valueBuffer.remaining)
241-
valueBuffer.get(valueBytes)
242-
valueBuffer.rewind()
240+
// value field can be null during merging
241+
if (valueBuffer == null) {
242+
updater.setNullAt(ordinal)
243+
} else {
244+
val valueBytes = new Array[Byte](valueBuffer.remaining)
245+
valueBuffer.get(valueBytes)
246+
valueBuffer.rewind()
243247

244-
val metadataBuffer = record.get(metadataIdx).asInstanceOf[ByteBuffer]
245-
val metadataBytes = new Array[Byte](metadataBuffer.remaining)
246-
metadataBuffer.get(metadataBytes)
247-
metadataBuffer.rewind()
248+
val metadataBuffer = record.get(metadataIdx).asInstanceOf[ByteBuffer]
249+
val metadataBytes = new Array[Byte](metadataBuffer.remaining)
250+
metadataBuffer.get(metadataBytes)
251+
metadataBuffer.rewind()
248252

249-
val variant = new VariantVal(valueBytes, metadataBytes)
250-
updater.set(ordinal, variant)
253+
val variant = new VariantVal(valueBytes, metadataBytes)
254+
updater.set(ordinal, variant)
255+
}
251256

252257
case (RECORD, st: StructType) =>
253258
// Avro datasource doesn't accept filters with nested attributes. See SPARK-32328.

0 commit comments

Comments
 (0)