Skip to content

Commit c89d0ea

Browse files
committed
get spark to hoodie schema round trip conversion
1 parent 2d446dc commit c89d0ea

File tree

3 files changed

+133
-5
lines changed

3 files changed

+133
-5
lines changed

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.spark.sql.execution.datasources.parquet.SparkBasicSchemaEvolution;
5858
import org.apache.spark.sql.internal.SQLConf;
5959
import org.apache.spark.sql.sources.Filter;
60+
import org.apache.spark.sql.types.Metadata;
6061
import org.apache.spark.sql.types.StructType;
6162

6263
import java.io.IOException;
@@ -204,7 +205,7 @@ public HoodieSchema getSchema() {
204205
MessageType messageType = getFileSchema();
205206
StructType structType = getStructSchema();
206207
schemaOption = Option.of(HoodieSparkSchemaConverters.toHoodieType(
207-
structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
208+
structType, true, messageType.getName(), StringUtils.EMPTY_STRING, Metadata.empty()));
208209
}
209210
return schemaOption.get();
210211
}

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,24 @@ object HoodieSparkSchemaConverters {
229229
val newRecordNames = existingRecordNames + fullName
230230
val fields = hoodieSchema.getFields.asScala.map { f =>
231231
val schemaType = toSqlTypeHelper(f.schema(), newRecordNames)
232-
val metadata = if (f.doc().isPresent && !f.doc().get().isEmpty) {
233-
new MetadataBuilder().putString("comment", f.doc().get()).build()
234-
} else {
235-
Metadata.empty
232+
// Merge type-specific metadata (e.g., vector dimension) with field doc comment
233+
val metadata = schemaType.metadata match {
234+
case Some(typeMetadata) =>
235+
// If there's type-specific metadata, use it as base
236+
if (f.doc().isPresent && !f.doc().get().isEmpty) {
237+
// Merge doc comment into the metadata
238+
new MetadataBuilder().withMetadata(typeMetadata)
239+
.putString("comment", f.doc().get()).build()
240+
} else {
241+
typeMetadata
242+
}
243+
case None =>
244+
// No type-specific metadata, just use doc comment if present
245+
if (f.doc().isPresent && !f.doc().get().isEmpty) {
246+
new MetadataBuilder().putString("comment", f.doc().get()).build()
247+
} else {
248+
Metadata.empty
249+
}
236250
}
237251
StructField(f.name(), schemaType.dataType, schemaType.nullable, metadata)
238252
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSchemaConversionUtils.scala

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,119 @@ class TestHoodieSchemaConversionUtils extends FunSuite with Matchers {
570570
internalRowCompare(row1, row2, sparkSchema)
571571
}
572572

573+
test("test VECTOR type conversion - Spark to HoodieSchema") {
574+
val metadata = new MetadataBuilder()
575+
.putLong("hoodie.vector.dimension", 128)
576+
.build()
577+
val struct = new StructType()
578+
.add("id", IntegerType, false)
579+
.add("embedding", ArrayType(FloatType, containsNull = false), nullable = false, metadata)
580+
581+
val hoodieSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
582+
struct, "VectorTest", "test")
583+
584+
// Verify the embedding field is VECTOR type
585+
val embeddingField = hoodieSchema.getField("embedding").get()
586+
assert(embeddingField.schema().getType == HoodieSchemaType.VECTOR)
587+
assert(embeddingField.schema().isInstanceOf[HoodieSchema.Vector])
588+
589+
val vectorSchema = embeddingField.schema().asInstanceOf[HoodieSchema.Vector]
590+
assert(vectorSchema.getDimension == 128)
591+
assert(vectorSchema.getVectorElementType == HoodieSchema.Vector.ELEMENT_TYPE_FLOAT)
592+
assert(!embeddingField.isNullable())
593+
}
594+
595+
test("test VECTOR type conversion - HoodieSchema to Spark") {
596+
val vectorSchema = HoodieSchema.createVector(256, HoodieSchema.Vector.ELEMENT_TYPE_FLOAT)
597+
val fields = java.util.Arrays.asList(
598+
HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.INT)),
599+
HoodieSchemaField.of("embedding", vectorSchema)
600+
)
601+
val hoodieSchema = HoodieSchema.createRecord("VectorTest", "test", null, fields)
602+
603+
val structType = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
604+
605+
// Verify the embedding field is ArrayType(FloatType)
606+
assert(structType.fields.length == 2)
607+
val embeddingField = structType.fields(1)
608+
assert(embeddingField.name == "embedding")
609+
assert(embeddingField.dataType.isInstanceOf[ArrayType])
610+
assert(!embeddingField.nullable)
611+
612+
val arrayType = embeddingField.dataType.asInstanceOf[ArrayType]
613+
assert(arrayType.elementType == FloatType)
614+
assert(!arrayType.containsNull)
615+
616+
// Verify metadata contains dimension
617+
assert(embeddingField.metadata.contains("hoodie.vector.dimension"))
618+
assert(embeddingField.metadata.getLong("hoodie.vector.dimension") == 256)
619+
}
620+
621+
test("test VECTOR round-trip conversion - Spark to HoodieSchema to Spark") {
622+
val metadata = new MetadataBuilder()
623+
.putLong("hoodie.vector.dimension", 512)
624+
.build()
625+
val originalStruct = new StructType()
626+
.add("id", LongType, false)
627+
.add("vector_field", ArrayType(FloatType, containsNull = false), nullable = false, metadata)
628+
.add("name", StringType, true)
629+
630+
// Convert Spark -> HoodieSchema -> Spark
631+
val hoodieSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
632+
originalStruct, "RoundTripTest", "test")
633+
val convertedStruct = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(hoodieSchema)
634+
635+
// Verify structure is preserved
636+
assert(convertedStruct.fields.length == originalStruct.fields.length)
637+
638+
// Verify vector field properties
639+
val originalVectorField = originalStruct.fields(1)
640+
val convertedVectorField = convertedStruct.fields(1)
641+
642+
assert(convertedVectorField.name == originalVectorField.name)
643+
assert(convertedVectorField.dataType == originalVectorField.dataType)
644+
assert(convertedVectorField.nullable == originalVectorField.nullable)
645+
646+
// Verify metadata is preserved
647+
assert(convertedVectorField.metadata.contains("hoodie.vector.dimension"))
648+
assert(convertedVectorField.metadata.getLong("hoodie.vector.dimension") == 512)
649+
650+
// Verify array properties
651+
val convertedArrayType = convertedVectorField.dataType.asInstanceOf[ArrayType]
652+
assert(convertedArrayType.elementType == FloatType)
653+
assert(!convertedArrayType.containsNull)
654+
}
655+
656+
test("test VECTOR round-trip conversion - HoodieSchema to Spark to HoodieSchema") {
657+
val originalVectorSchema = HoodieSchema.createVector(1024, HoodieSchema.Vector.ELEMENT_TYPE_FLOAT)
658+
val fields = java.util.Arrays.asList(
659+
HoodieSchemaField.of("id", HoodieSchema.create(HoodieSchemaType.LONG)),
660+
HoodieSchemaField.of("embedding", originalVectorSchema),
661+
HoodieSchemaField.of("metadata", HoodieSchema.createNullable(HoodieSchemaType.STRING))
662+
)
663+
val originalHoodieSchema = HoodieSchema.createRecord("RoundTripTest", "test", null, fields)
664+
665+
// Convert HoodieSchema -> Spark -> HoodieSchema
666+
val sparkStruct = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(originalHoodieSchema)
667+
val convertedHoodieSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
668+
sparkStruct, "RoundTripTest", "test")
669+
670+
// Verify the vector field is preserved
671+
val convertedEmbeddingField = convertedHoodieSchema.getField("embedding").get()
672+
assert(convertedEmbeddingField.schema().getType == HoodieSchemaType.VECTOR)
673+
assert(convertedEmbeddingField.schema().isInstanceOf[HoodieSchema.Vector])
674+
675+
val convertedVectorSchema = convertedEmbeddingField.schema().asInstanceOf[HoodieSchema.Vector]
676+
assert(convertedVectorSchema.getDimension == 1024)
677+
assert(convertedVectorSchema.getVectorElementType == HoodieSchema.Vector.ELEMENT_TYPE_FLOAT)
678+
assert(!convertedEmbeddingField.isNullable())
679+
680+
// Verify other fields are preserved
681+
assert(convertedHoodieSchema.getFields.size() == 3)
682+
assert(convertedHoodieSchema.getField("id").get().schema().getType == HoodieSchemaType.LONG)
683+
assert(convertedHoodieSchema.getField("metadata").get().isNullable())
684+
}
685+
573686
private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = {
574687
schema match {
575688
case StructType(fields) =>

0 commit comments

Comments
 (0)