Skip to content

Commit 6605bcf

Browse files
committed
feat(schema): Add read + write support for shredded for AVRO
- Added support to write shredded types for HoodieRecordType.AVRO - Added functional tests for testing newly added configs
1 parent d4ba92b commit 6605bcf

File tree

9 files changed

+1093
-12
lines changed

9 files changed

+1093
-12
lines changed

hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,16 @@ public class HoodieStorageConfig extends HoodieConfig {
198198
+ "When disabled, only unshredded variant data can be read. "
199199
+ "Equivalent to Spark's spark.sql.variant.allowReadingShredded.");
200200

201+
public static final ConfigProperty<String> PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS = ConfigProperty
202+
.key("hoodie.parquet.variant.shredding.provider.class")
203+
.noDefaultValue()
204+
.markAdvanced()
205+
.sinceVersion("1.1.0")
206+
.withDocumentation("Fully-qualified class name of the VariantShreddingProvider implementation "
207+
+ "used to shred variant values at write time in the Avro record path. "
208+
+ "The provider parses variant binary data and populates typed_value columns. "
209+
+ "When not set, the provider is auto-detected from the classpath.");
210+
201211
public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE = ConfigProperty
202212
.key("hoodie.parquet.write.utc-timezone.enabled")
203213
.defaultValue(true)

hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java

Lines changed: 393 additions & 3 deletions
Large diffs are not rendered by default.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.avro;
21+
22+
import org.apache.hudi.common.schema.HoodieSchema;
23+
24+
import org.apache.avro.Schema;
25+
import org.apache.avro.generic.GenericRecord;
26+
27+
/**
28+
* Interface for shredding variant values at write time.
29+
* <p>
30+
* Implementations parse variant binary data (value + metadata bytes) and produce
31+
* a shredded {@link GenericRecord} with typed_value columns populated according
32+
* to the shredding schema.
33+
* <p>
34+
* This interface allows the variant binary parsing logic (which may depend on
35+
* engine-specific libraries like Spark's variant module) to be loaded via reflection,
36+
* keeping the core write support free of engine-specific dependencies.
37+
*/
38+
public interface VariantShreddingProvider {
39+
40+
/**
41+
* Transform an unshredded variant GenericRecord into a shredded one.
42+
* <p>
43+
* The input record is expected to have:
44+
* <ul>
45+
* <li>{@code value}: ByteBuffer containing the variant value binary</li>
46+
* <li>{@code metadata}: ByteBuffer containing the variant metadata binary</li>
47+
* </ul>
48+
* <p>
49+
* The output record should conform to {@code shreddedSchema} and have:
50+
* <ul>
51+
* <li>{@code value}: ByteBuffer or null (null when typed_value captures the full value)</li>
52+
* <li>{@code metadata}: ByteBuffer (always present)</li>
53+
* <li>{@code typed_value}: the typed representation extracted from the variant binary,
54+
* or null if the variant type does not match the typed_value schema</li>
55+
* </ul>
56+
*
57+
* @param unshreddedVariant GenericRecord with {value: ByteBuffer, metadata: ByteBuffer}
58+
* @param shreddedSchema target Avro schema with {value: nullable ByteBuffer, metadata: ByteBuffer, typed_value: type}
59+
* @param variantSchema HoodieSchema.Variant containing the shredding schema information
60+
* @return a GenericRecord conforming to shreddedSchema with typed_value populated where possible
61+
*/
62+
GenericRecord shredVariantRecord(
63+
GenericRecord unshreddedVariant,
64+
Schema shreddedSchema,
65+
HoodieSchema.Variant variantSchema);
66+
}

hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Properties;
5050

5151
import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WRITER_TO_ALLOW_DUPLICATES;
52+
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
5253
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
5354

5455
public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
@@ -123,9 +124,37 @@ protected HoodieFileWriter newOrcFileWriter(
123124
private HoodieAvroWriteSupport getHoodieAvroWriteSupport(HoodieSchema schema,
124125
HoodieConfig config, boolean enableBloomFilter) {
125126
Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
127+
HoodieSchema effectiveSchema = HoodieAvroWriteSupport.generateEffectiveSchema(schema, config);
128+
Properties props = config.getProps();
129+
// Auto-detect variant shredding provider from classpath if not explicitly configured
130+
if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
131+
String detected = detectShreddingProvider();
132+
if (detected != null) {
133+
props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(), detected);
134+
}
135+
}
126136
return (HoodieAvroWriteSupport) ReflectionUtils.loadClass(
127137
config.getStringOrDefault(HoodieStorageConfig.HOODIE_AVRO_WRITE_SUPPORT_CLASS),
128138
new Class<?>[] {MessageType.class, HoodieSchema.class, Option.class, Properties.class},
129-
getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(schema), schema, filter, config.getProps());
139+
getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(effectiveSchema), schema, filter, props);
140+
}
141+
142+
/**
143+
* Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider} implementation
144+
* available on the classpath. Returns the fully-qualified class name if found, or null.
145+
*/
146+
private static String detectShreddingProvider() {
147+
String[] candidates = {
148+
"org.apache.hudi.variant.Spark4VariantShreddingProvider"
149+
};
150+
for (String candidate : candidates) {
151+
try {
152+
Class.forName(candidate);
153+
return candidate;
154+
} catch (ClassNotFoundException e) {
155+
// not on classpath, try next
156+
}
157+
}
158+
return null;
130159
}
131160
}

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
6262
val schemaSpec: Option[StructType],
6363
val isBootstrap: Boolean
6464
) extends SparkAdapterSupport with HoodieHadoopFsRelationFactory with Logging {
65+
// Propagate Hudi's variant allow-reading-shredded config to Spark's SQLConf.
66+
// ParquetToSparkSchemaConverter reads this from SQLConf.get(), so it must be set
67+
// before query execution starts here during table resolution
68+
if (HoodieSparkUtils.gteqSpark4_0) {
69+
val sqlConf = sqlContext.sparkSession.sessionState.conf
70+
val hoodieParquetAllowReadingShreddedConfKey = "hoodie.parquet.variant.allow.reading.shredded"
71+
val allowReadingShredded = options.getOrElse(
72+
hoodieParquetAllowReadingShreddedConfKey,
73+
sqlConf.getConfString(hoodieParquetAllowReadingShreddedConfKey, "true"))
74+
sqlConf.setConfString("spark.sql.variant.allowReadingShredded", allowReadingShredded)
75+
}
76+
6577
protected lazy val sparkSession: SparkSession = sqlContext.sparkSession
6678
protected lazy val optParams: Map[String, String] = options
6779

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import org.apache.hudi.HoodieSparkUtils
2323
import org.apache.hudi.common.testutils.HoodieTestUtils
2424
import org.apache.hudi.common.util.StringUtils
2525

26+
import org.apache.hadoop.fs.{FileSystem, Path => HadoopPath}
27+
import org.apache.parquet.hadoop.ParquetFileReader
28+
import org.apache.parquet.hadoop.util.HadoopInputFile
29+
import org.apache.parquet.schema.{GroupType, MessageType, Type}
2630
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
2731

2832

@@ -171,4 +175,166 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
171175

172176
spark.sql(s"drop table $tableName")
173177
}
178+
179+
test("Test Shredded Variant Write and Read + Validate Parquet Schema after Write") {
180+
assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher")
181+
182+
// Test 1: Shredding enabled with forced schema → parquet should have typed_value
183+
withRecordType()(withTempDir { tmp =>
184+
val tableName = generateTableName
185+
spark.sql(
186+
s"""
187+
|create table $tableName (
188+
| id int,
189+
| name string,
190+
| v variant,
191+
| ts long
192+
|) using hudi
193+
| location '${tmp.getCanonicalPath}'
194+
| tblproperties (
195+
| primaryKey = 'id',
196+
| type = 'cow',
197+
| preCombineField = 'ts'
198+
| )
199+
""".stripMargin)
200+
201+
spark.sql("set hoodie.parquet.variant.write.shredding.enabled = true")
202+
spark.sql("set hoodie.parquet.variant.allow.reading.shredded = true")
203+
spark.sql("set hoodie.parquet.variant.force.shredding.schema.for.test = a int, b string")
204+
205+
spark.sql(
206+
s"""
207+
|insert into $tableName values
208+
| (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
209+
""".stripMargin)
210+
checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")(
211+
Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
212+
)
213+
214+
// Verify parquet schema has shredded structure with typed_value
215+
val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
216+
assert(parquetFiles.nonEmpty, "Should have at least one data parquet file")
217+
218+
parquetFiles.foreach { filePath =>
219+
val schema = readParquetSchema(filePath)
220+
val variantGroup = getFieldAsGroup(schema, "v")
221+
assert(groupContainsField(variantGroup, "typed_value"),
222+
s"Shredded variant should have typed_value field. Schema:\n$variantGroup")
223+
val valueField = variantGroup.getType(variantGroup.getFieldIndex("value"))
224+
assert(valueField.getRepetition == Type.Repetition.OPTIONAL,
225+
"Shredded variant value field should be OPTIONAL")
226+
val metadataField = variantGroup.getType(variantGroup.getFieldIndex("metadata"))
227+
assert(metadataField.getRepetition == Type.Repetition.REQUIRED,
228+
"Shredded variant metadata field should be REQUIRED")
229+
}
230+
})
231+
}
232+
233+
test("Test Unshredded Variant Write and Read + Validate Parquet Schema after Write") {
234+
assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or higher")
235+
// Shredding disabled parquet should NOT have typed_value
236+
withRecordType()(withTempDir { tmp =>
237+
val tableName = generateTableName
238+
spark.sql(
239+
s"""
240+
|create table $tableName (
241+
| id int,
242+
| name string,
243+
| v variant,
244+
| ts long
245+
|) using hudi
246+
| location '${tmp.getCanonicalPath}'
247+
| tblproperties (
248+
| primaryKey = 'id',
249+
| type = 'cow',
250+
| preCombineField = 'ts'
251+
| )
252+
""".stripMargin)
253+
254+
spark.sql(s"set hoodie.parquet.variant.write.shredding.enabled = false")
255+
256+
spark.sql(
257+
s"""
258+
|insert into $tableName values
259+
| (1, 'row1', parse_json('{"a": 1, "b": "hello"}'), 1000)
260+
""".stripMargin)
261+
262+
checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")(
263+
Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
264+
)
265+
266+
// Verify parquet schema does NOT have typed_value
267+
val parquetFiles = listDataParquetFiles(tmp.getCanonicalPath)
268+
assert(parquetFiles.nonEmpty, "Should have at least one data parquet file")
269+
270+
parquetFiles.foreach { filePath =>
271+
val schema = readParquetSchema(filePath)
272+
val variantGroup = getFieldAsGroup(schema, "v")
273+
assert(!groupContainsField(variantGroup, "typed_value"),
274+
s"Non-shredded variant should NOT have typed_value field. Schema:\n$variantGroup")
275+
val valueField = variantGroup.getType(variantGroup.getFieldIndex("value"))
276+
assert(valueField.getRepetition == Type.Repetition.REQUIRED,
277+
"Non-shredded variant value field should be REQUIRED")
278+
}
279+
280+
// Verify data can still be read back for the non-shredded case
281+
checkAnswer(s"select id, name, cast(v as string), ts from $tableName order by id")(
282+
Seq(1, "row1", "{\"a\":1,\"b\":\"hello\"}", 1000)
283+
)
284+
})
285+
}
286+
287+
/**
288+
* Lists data parquet files in the table directory, excluding Hudi metadata files.
289+
*/
290+
private def listDataParquetFiles(tablePath: String): Seq[String] = {
291+
val conf = spark.sparkContext.hadoopConfiguration
292+
val fs = FileSystem.get(new HadoopPath(tablePath).toUri, conf)
293+
val iter = fs.listFiles(new HadoopPath(tablePath), true)
294+
val files = scala.collection.mutable.ArrayBuffer[String]()
295+
while (iter.hasNext) {
296+
val file = iter.next()
297+
val path = file.getPath.toString
298+
if (path.endsWith(".parquet") && !path.contains(".hoodie")) {
299+
files += path
300+
}
301+
}
302+
files.toSeq
303+
}
304+
305+
/**
306+
* Reads the Parquet schema (MessageType) from a parquet file.
307+
*/
308+
private def readParquetSchema(filePath: String): MessageType = {
309+
val conf = spark.sparkContext.hadoopConfiguration
310+
val inputFile = HadoopInputFile.fromPath(new HadoopPath(filePath), conf)
311+
val reader = ParquetFileReader.open(inputFile)
312+
try {
313+
reader.getFooter.getFileMetaData.getSchema
314+
} finally {
315+
reader.close()
316+
}
317+
}
318+
319+
/**
320+
* Gets a named field from a GroupType (MessageType) and returns it as a GroupType.
321+
* Uses getFieldIndex(String) + getType(int) to avoid Scala overload resolution issues.
322+
*/
323+
private def getFieldAsGroup(parent: GroupType, fieldName: String): GroupType = {
324+
val idx: Int = parent.getFieldIndex(fieldName)
325+
parent.getType(idx).asGroupType()
326+
}
327+
328+
/**
329+
* Checks whether a GroupType contains a field with the given name.
330+
* Uses try/catch on getFieldIndex to avoid Scala-Java collection converter dependencies.
331+
*/
332+
private def groupContainsField(group: GroupType, fieldName: String): Boolean = {
333+
try {
334+
group.getFieldIndex(fieldName)
335+
true
336+
} catch {
337+
case _: Exception => false
338+
}
339+
}
174340
}

0 commit comments

Comments
 (0)