Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ public static DataType convertToDataType(HoodieSchema hoodieSchema) {
return convertRecord(hoodieSchema);
case UNION:
return convertUnion(hoodieSchema);
case VARIANT:
return convertVariant(hoodieSchema);
default:
throw new IllegalArgumentException("Unsupported HoodieSchemaType: " + type);
}
Expand Down Expand Up @@ -445,6 +447,25 @@ private static DataType convertUnion(HoodieSchema schema) {
return nullable ? rawDataType.nullable() : rawDataType;
}

/**
* Converts a Variant schema to Flink's ROW type.
* Variant is represented as ROW<`value` BYTES, `metadata` BYTES> in Flink.
*
* @param schema HoodieSchema to convert (must be a VARIANT type)
* @return DataType representing the Variant as a ROW with binary fields
*/
private static DataType convertVariant(HoodieSchema schema) {
if (schema.getType() != HoodieSchemaType.VARIANT) {
throw new IllegalStateException("Expected HoodieSchema.Variant but got: " + schema.getClass());
}

// Variant is stored as a struct with two binary fields: value and metadata
return DataTypes.ROW(
DataTypes.FIELD("value", DataTypes.BYTES().notNull()),
DataTypes.FIELD("metadata", DataTypes.BYTES().notNull())
).notNull();
}

/**
* Returns true if all the types are RECORD type with same number of fields.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io.storage.row;

import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
Expand Down Expand Up @@ -73,11 +74,15 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

import scala.Enumeration;
import scala.Function1;

import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED;
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED;
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
import static org.apache.hudi.config.HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD;
import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_STRING;
import static org.apache.hudi.config.HoodieWriteConfig.INTERNAL_SCHEMA_STRING;
Expand Down Expand Up @@ -121,6 +126,14 @@ public class HoodieRowParquetWriteSupport extends WriteSupport<InternalRow> {
private final ValueWriter[] rootFieldWriters;
private final HoodieSchema schema;
private final StructType structType;
/**
* The shredded schema. When Variant columns are configured for shredding, this schema has those VariantType columns replaced with their shredded struct schemas.
* <p>
* For non-shredded cases, this is identical to structType.
*/
private final StructType shreddedSchema;
private final boolean variantWriteShreddingEnabled;
private final String variantForceShreddingSchemaForTest;
private RecordConsumer recordConsumer;

public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
Expand All @@ -129,6 +142,16 @@ public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, O
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeLegacyFormatEnabled);
hadoopConf.set("spark.sql.parquet.outputTimestampType", config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED));

// Variant shredding configs
this.variantWriteShreddingEnabled = config.getBooleanOrDefault(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED);
this.variantForceShreddingSchemaForTest = config.getString(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST);
hadoopConf.setBoolean("spark.sql.variant.writeShredding.enabled", variantWriteShreddingEnabled);
hadoopConf.setBoolean("spark.sql.variant.allowReadingShredded", config.getBooleanOrDefault(PARQUET_VARIANT_ALLOW_READING_SHREDDED));
if (variantForceShreddingSchemaForTest != null && !variantForceShreddingSchemaForTest.isEmpty()) {
hadoopConf.set("spark.sql.variant.forceShreddingSchemaForTest", variantForceShreddingSchemaForTest);
}

this.writeLegacyListFormat = Boolean.parseBoolean(writeLegacyFormatEnabled)
|| Boolean.parseBoolean(config.getStringOrDefault(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, "false"));
this.structType = structType;
Expand All @@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema, config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
// Generate shredded schema if there are shredded Variant columns
this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
this.rootFieldWriters = getFieldWriters(structType, schema);
// Use shreddedSchema for creating writers when shredded Variants are present
this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}

/**
* Generates a shredded schema from the given structType and hoodieSchema.
* <p>
* For Variant fields that are configured for shredding (based on HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded struct schema.
* <p>
* Shredding behavior is controlled by:
* <ul>
* <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master switch for shredding (default: true).
* When false, no shredding happens regardless of schema configuration.</li>
* <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} - When set, forces this DDL schema
* as the typed_value schema for ALL variant columns, overriding schema-driven shredding.</li>
* </ul>
*
* @param structType The original Spark StructType
* @param hoodieSchema The HoodieSchema containing shredding information
* @return A StructType with shredded Variant fields replaced by their shredded schemas
*/
private StructType generateShreddedSchema(StructType structType, HoodieSchema hoodieSchema) {
// If write shredding is disabled, skip shredding entirely
if (!variantWriteShreddingEnabled) {
return structType;
}

// Parse forced shredding schema if configured
StructType forcedShreddingSchema = null;
if (variantForceShreddingSchemaForTest != null && !variantForceShreddingSchemaForTest.isEmpty()) {
forcedShreddingSchema = StructType.fromDDL(variantForceShreddingSchemaForTest);
}

StructField[] fields = structType.fields();
StructField[] shreddedFields = new StructField[fields.length];
boolean hasShredding = false;

for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];
DataType dataType = field.dataType();

// Check if this is a Variant field that should be shredded
if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
// If a forced shredding schema is configured, use it for all variant columns
if (forcedShreddingSchema != null) {
StructType markedShreddedStruct = SparkAdapterSupport$.MODULE$.sparkAdapter()
.generateVariantWriteShreddingSchema(forcedShreddingSchema, true, false);
shreddedFields[i] = new StructField(field.name(), markedShreddedStruct, field.nullable(), field.metadata());
hasShredding = true;
continue;
}

// Otherwise, use schema-driven shredding
HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
.flatMap(s -> s.getField(field.name()))
.map(f -> f.schema())
.orElse(null);

if (fieldHoodieSchema != null && fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) {
HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) fieldHoodieSchema;
if (variantSchema.isShredded() && variantSchema.getTypedValueField().isPresent()) {
// Use plain types for SparkShreddingUtils (unwraps nested {value, typed_value} structs if present)
HoodieSchema typedValueSchema = variantSchema.getPlainTypedValueSchema().get();
DataType typedValueDataType = HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(typedValueSchema);

// Generate the shredding schema with write metadata using SparkAdapter
StructType markedShreddedStruct = SparkAdapterSupport$.MODULE$.sparkAdapter()
.generateVariantWriteShreddingSchema(typedValueDataType, true, false);

shreddedFields[i] = new StructField(field.name(), markedShreddedStruct, field.nullable(), field.metadata());
hasShredding = true;
continue;
}
}
}

// Not a shredded Variant, keep the original field
shreddedFields[i] = field;
}

return hasShredding ? new StructType(shreddedFields) : structType;
}

/**
* Creates field writers for each field in the schema.
*
* @param schema The schema to create writers for (may contain shredded Variant struct types)
* @param hoodieSchema The HoodieSchema for type information
* @return Array of ValueWriters for each field
*/
private ValueWriter[] getFieldWriters(StructType schema, HoodieSchema hoodieSchema) {
return Arrays.stream(schema.fields()).map(field -> {
StructField[] fields = schema.fields();
ValueWriter[] writers = new ValueWriter[fields.length];

for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];

HoodieSchema fieldSchema = Option.ofNullable(hoodieSchema)
.flatMap(s -> s.getField(field.name()))
// Note: Cannot use HoodieSchemaField::schema method reference due to Java 17 compilation ambiguity
.map(f -> f.schema())
.orElse(null);
return makeWriter(fieldSchema, field.dataType());
}).toArray(ValueWriter[]::new);

writers[i] = makeWriter(fieldSchema, field.dataType());
}

return writers;
}

@Override
Expand All @@ -166,7 +286,9 @@ public WriteContext init(Configuration configuration) {
}
Configuration configurationCopy = new Configuration(configuration);
configurationCopy.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, Boolean.toString(writeLegacyListFormat));
MessageType messageType = convert(structType, schema);
// Use shreddedSchema for Parquet schema conversion when shredding is enabled
// This ensures the Parquet file structure includes the shredded typed_value columns
MessageType messageType = convert(shreddedSchema, schema);
return new WriteContext(messageType, metadata);
}

Expand Down Expand Up @@ -281,6 +403,18 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) {
} else if (dataType == DataTypes.BinaryType) {
return (row, ordinal) -> recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getBinary(ordinal)));
} else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
// Maps VariantType to a group containing 'metadata' and 'value' fields.
// This ensures Spark 4.0 compatibility and supports both Shredded and Unshredded schemas.
// Note: We intentionally omit 'typed_value' for shredded variants as this writer only accesses raw binary blobs.
BiConsumer<SpecializedGetters, Integer> variantWriter = SparkAdapterSupport$.MODULE$.sparkAdapter().createVariantValueWriter(
dataType,
valueBytes -> consumeField("value", 0, () -> recordConsumer.addBinary(Binary.fromReusedByteArray(valueBytes))),
metadataBytes -> consumeField("metadata", 1, () -> recordConsumer.addBinary(Binary.fromReusedByteArray(metadataBytes)))
);
return (row, ordinal) -> {
consumeGroup(() -> variantWriter.accept(row, ordinal));
};
} else if (dataType instanceof DecimalType) {
return (row, ordinal) -> {
int precision = ((DecimalType) dataType).precision();
Expand Down Expand Up @@ -337,6 +471,9 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) {
}
});
};
} else if (dataType instanceof StructType
&& SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantShreddingStruct((StructType) dataType)) {
return makeShreddedVariantWriter((StructType) dataType);
} else if (dataType instanceof StructType) {
StructType structType = (StructType) dataType;
ValueWriter[] fieldWriters = getFieldWriters(structType, resolvedSchema);
Expand All @@ -349,6 +486,33 @@ private ValueWriter makeWriter(HoodieSchema schema, DataType dataType) {
}
}

/**
* Creates a ValueWriter for a shredded Variant column.
* This writer converts a Variant value into its shredded components (metadata, value, typed_value) and writes them to Parquet.
*
* @param shreddedStructType The shredded StructType (with shredding metadata)
* @return A ValueWriter that handles shredded Variant writing
*/
private ValueWriter makeShreddedVariantWriter(StructType shreddedStructType) {
// Create writers for the shredded struct fields
// The shreddedStructType contains: metadata (binary), value (binary), typed_value (optional)
ValueWriter[] shreddedFieldWriters = Arrays.stream(shreddedStructType.fields())
.map(field -> makeWriter(null, field.dataType()))
.toArray(ValueWriter[]::new);

// Use the SparkAdapter to create a shredded variant writer that converts Variant to shredded components
BiConsumer<SpecializedGetters, Integer> shreddedWriter = SparkAdapterSupport$.MODULE$.sparkAdapter()
.createShreddedVariantWriter(
shreddedStructType,
shreddedRow -> {
// Write the shredded row as a group
consumeGroup(() -> writeFields(shreddedRow, shreddedStructType, shreddedFieldWriters));
}
);

return shreddedWriter::accept;
}

private ValueWriter twoLevelArrayWriter(String repeatedFieldName, ValueWriter elementWriter) {
return (row, ordinal) -> {
ArrayData array = row.getArray(ordinal);
Expand Down Expand Up @@ -510,6 +674,13 @@ private Type convertField(HoodieSchema fieldSchema, StructField structField, Typ
.as(LogicalTypeAnnotation.stringType()).named(structField.name());
} else if (dataType == DataTypes.BinaryType) {
return Types.primitive(BINARY, repetition).named(structField.name());
} else if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
return SparkAdapterSupport$.MODULE$.sparkAdapter().convertVariantFieldToParquetType(
dataType,
structField.name(),
resolvedSchema,
repetition
);
} else if (dataType instanceof DecimalType) {
int precision = ((DecimalType) dataType).precision();
int scale = ((DecimalType) dataType).scale();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap, filterIsSafeForPrimaryKey, getAppliedRequiredSchema}
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.HoodieSchemaConversionUtils.convertHoodieSchemaToStructType
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath}
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow}
Expand Down Expand Up @@ -398,11 +399,47 @@ object HoodieInternalRowUtils {
(fieldUpdater, ordinal, value) =>
fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))

// Handle conversion from VariantType to variant struct representation
case (newStructType: StructType, _) if sparkAdapter.isVariantType(prevDataType) && looksLikeVariantStruct(newStructType) =>
(fieldUpdater, ordinal, value) => {
if (value == null) {
fieldUpdater.setNullAt(ordinal)
} else {
val row = sparkAdapter.convertVariantToStruct(value, newStructType)
fieldUpdater.set(ordinal, row)
}
}

// Handle conversion from variant struct representation to VariantType
case (_, prevStructType: StructType) if sparkAdapter.isVariantType(newDataType) && looksLikeVariantStruct(prevStructType) =>
(fieldUpdater, ordinal, value) => {
if (value == null) {
fieldUpdater.setNullAt(ordinal)
} else {
val row = value.asInstanceOf[InternalRow]
val variant = sparkAdapter.convertStructToVariant(row, prevStructType)
fieldUpdater.set(ordinal, variant)
}
}

case (_, _) =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
}

/**
* Checks if a StructType looks like a variant representation (has value and metadata binary fields).
* This is a structural check that doesn't rely on metadata, useful during schema reconciliation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the struct's metadata to be more direct about what is or isn't a variant?

* when toggling between shredded/unshredded formats or merging data with different representations.
*/
private def looksLikeVariantStruct(structType: StructType): Boolean = {
structType.fields.length >= 2 &&
structType.fieldNames.contains("value") &&
structType.fieldNames.contains("metadata") &&
structType("value").dataType == BinaryType &&
structType("metadata").dataType == BinaryType
}

private def lookupRenamedField(newFieldName: String,
newFieldQualifiedName: String,
renamedColumnsMap: JMap[String, String]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.spark.sql

import org.apache.hudi.{HoodieUnsafeRDD}
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.types.StructType

/**
Expand Down
Loading
Loading