Skip to content

Commit f79a417

Browse files
committed
feat(schema): Add read + write support for shredded for AVRO
- Added support to write shredded types for HoodieRecordType.AVRO - Added functional tests for testing newly added configs
1 parent d4ba92b commit f79a417

File tree

16 files changed

+2544
-21
lines changed

16 files changed

+2544
-21
lines changed

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
2222
import org.apache.hudi.exception.HoodieException
2323
import org.apache.hudi.common.schema.HoodieSchema
2424
import org.apache.hudi.HoodieSchemaConversionUtils.convertHoodieSchemaToStructType
25+
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
2526
import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
2627
import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath}
2728
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow}
@@ -398,11 +399,47 @@ object HoodieInternalRowUtils {
398399
(fieldUpdater, ordinal, value) =>
399400
fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
400401

402+
// Handle conversion from VariantType to variant struct representation
403+
case (newStructType: StructType, _) if sparkAdapter.isVariantType(prevDataType) && looksLikeVariantStruct(newStructType) =>
404+
(fieldUpdater, ordinal, value) => {
405+
if (value == null) {
406+
fieldUpdater.setNullAt(ordinal)
407+
} else {
408+
val row = sparkAdapter.convertVariantToStruct(value, newStructType)
409+
fieldUpdater.set(ordinal, row)
410+
}
411+
}
412+
413+
// Handle conversion from variant struct representation to VariantType
414+
case (_, prevStructType: StructType) if sparkAdapter.isVariantType(newDataType) && looksLikeVariantStruct(prevStructType) =>
415+
(fieldUpdater, ordinal, value) => {
416+
if (value == null) {
417+
fieldUpdater.setNullAt(ordinal)
418+
} else {
419+
val row = value.asInstanceOf[InternalRow]
420+
val variant = sparkAdapter.convertStructToVariant(row, prevStructType)
421+
fieldUpdater.set(ordinal, variant)
422+
}
423+
}
424+
401425
case (_, _) =>
402426
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
403427
}
404428
}
405429

430+
/**
431+
* Checks if a StructType looks like a variant representation (has value and metadata binary fields).
432+
* This is a structural check that doesn't rely on metadata, useful during schema reconciliation
433+
* when toggling between shredded/unshredded formats or merging data with different representations.
434+
*/
435+
private def looksLikeVariantStruct(structType: StructType): Boolean = {
436+
structType.fields.length >= 2 &&
437+
structType.fieldNames.contains("value") &&
438+
structType.fieldNames.contains("metadata") &&
439+
structType("value").dataType == BinaryType &&
440+
structType("metadata").dataType == BinaryType
441+
}
442+
406443
private def lookupRenamedField(newFieldName: String,
407444
newFieldQualifiedName: String,
408445
renamedColumnsMap: JMap[String, String]): String = {

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,4 +483,30 @@ trait SparkAdapter extends Serializable {
483483
shreddedStructType: StructType,
484484
writeStruct: Consumer[InternalRow]
485485
): BiConsumer[SpecializedGetters, Integer]
486+
487+
/**
488+
* Converts a Variant value to a struct representation (value and metadata binary fields).
489+
* This is used during schema reconciliation when converting from VariantType to StructType.
490+
*
491+
* For Spark 4.x, extracts value and metadata bytes from VariantVal and creates an InternalRow.
492+
* For Spark 3.x, this throws UnsupportedOperationException.
493+
*
494+
* @param variantValue The variant value object
495+
* @param structType The target StructType with value and metadata fields
496+
* @return InternalRow with value and metadata fields populated
497+
*/
498+
def convertVariantToStruct(variantValue: Any, structType: StructType): InternalRow
499+
500+
/**
501+
* Converts a struct representation (value and metadata binary fields) to a Variant value.
502+
* This is used during schema reconciliation when converting from StructType to VariantType.
503+
*
504+
* For Spark 4.x, extracts value and metadata bytes from InternalRow and creates a VariantVal.
505+
* For Spark 3.x, this throws UnsupportedOperationException.
506+
*
507+
* @param structRow The InternalRow with value and metadata fields
508+
* @param structType The StructType of the struct
509+
* @return Variant value object
510+
*/
511+
def convertStructToVariant(structRow: InternalRow, structType: StructType): Any
486512
}

hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,16 @@ public class HoodieStorageConfig extends HoodieConfig {
198198
+ "When disabled, only unshredded variant data can be read. "
199199
+ "Equivalent to Spark's spark.sql.variant.allowReadingShredded.");
200200

201+
public static final ConfigProperty<String> PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS = ConfigProperty
202+
.key("hoodie.parquet.variant.shredding.provider.class")
203+
.noDefaultValue()
204+
.markAdvanced()
205+
.sinceVersion("1.1.0")
206+
.withDocumentation("Fully-qualified class name of the VariantShreddingProvider implementation "
207+
+ "used to shred variant values at write time in the Avro record path. "
208+
+ "The provider parses variant binary data and populates typed_value columns. "
209+
+ "When not set, the provider is auto-detected from the classpath.");
210+
201211
public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE = ConfigProperty
202212
.key("hoodie.parquet.write.utc-timezone.enabled")
203213
.defaultValue(true)

0 commit comments

Comments
 (0)