Skip to content

Commit cd2f826

Browse files
committed
feat(schema): Add read + write support for shredded for AVRO
- Added support to write shredded types for HoodieRecordType.AVRO - Added functional tests for testing newly added configs
1 parent d4ba92b commit cd2f826

File tree

12 files changed

+2438
-12
lines changed

12 files changed

+2438
-12
lines changed

hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,16 @@ public class HoodieStorageConfig extends HoodieConfig {
198198
+ "When disabled, only unshredded variant data can be read. "
199199
+ "Equivalent to Spark's spark.sql.variant.allowReadingShredded.");
200200

201+
public static final ConfigProperty<String> PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS = ConfigProperty
202+
.key("hoodie.parquet.variant.shredding.provider.class")
203+
.noDefaultValue()
204+
.markAdvanced()
205+
.sinceVersion("1.1.0")
206+
.withDocumentation("Fully-qualified class name of the VariantShreddingProvider implementation "
207+
+ "used to shred variant values at write time in the Avro record path. "
208+
+ "The provider parses variant binary data and populates typed_value columns. "
209+
+ "When not set, the provider is auto-detected from the classpath.");
210+
201211
public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE = ConfigProperty
202212
.key("hoodie.parquet.write.utc-timezone.enabled")
203213
.defaultValue(true)

hudi-hadoop-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java

Lines changed: 432 additions & 3 deletions
Large diffs are not rendered by default.
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.avro;
21+
22+
import org.apache.hudi.common.schema.HoodieSchema;
23+
24+
import org.apache.avro.Schema;
25+
import org.apache.avro.generic.GenericRecord;
26+
27+
/**
28+
* Interface for shredding variant values at write time.
29+
* <p>
30+
* Implementations parse variant binary data (value + metadata bytes) and produce
31+
* a shredded {@link GenericRecord} with typed_value columns populated according
32+
* to the shredding schema.
33+
* <p>
34+
* This interface allows the variant binary parsing logic (which may depend on
35+
* engine-specific libraries like Spark's variant module) to be loaded via reflection,
36+
* keeping the core write support free of engine-specific dependencies.
37+
*/
38+
public interface VariantShreddingProvider {
39+
40+
/**
41+
* Transform an unshredded variant GenericRecord into a shredded one.
42+
* <p>
43+
* The input record is expected to have:
44+
* <ul>
45+
* <li>{@code value}: ByteBuffer containing the variant value binary</li>
46+
* <li>{@code metadata}: ByteBuffer containing the variant metadata binary</li>
47+
* </ul>
48+
* <p>
49+
* The output record should conform to {@code shreddedSchema} and have:
50+
* <ul>
51+
* <li>{@code value}: ByteBuffer or null (null when typed_value captures the full value)</li>
52+
* <li>{@code metadata}: ByteBuffer (always present)</li>
53+
* <li>{@code typed_value}: the typed representation extracted from the variant binary,
54+
* or null if the variant type does not match the typed_value schema</li>
55+
* </ul>
56+
*
57+
* @param unshreddedVariant GenericRecord with {value: ByteBuffer, metadata: ByteBuffer}
58+
* @param shreddedSchema target Avro schema with {value: nullable ByteBuffer, metadata: ByteBuffer, typed_value: type}
59+
* @param variantSchema HoodieSchema.Variant containing the shredding schema information
60+
* @return a GenericRecord conforming to shreddedSchema with typed_value populated where possible
61+
*/
62+
GenericRecord shredVariantRecord(
63+
GenericRecord unshreddedVariant,
64+
Schema shreddedSchema,
65+
HoodieSchema.Variant variantSchema);
66+
}

hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Properties;
5050

5151
import static org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WRITER_TO_ALLOW_DUPLICATES;
52+
import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS;
5253
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
5354

5455
public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
@@ -123,9 +124,37 @@ protected HoodieFileWriter newOrcFileWriter(
123124
private HoodieAvroWriteSupport getHoodieAvroWriteSupport(HoodieSchema schema,
124125
HoodieConfig config, boolean enableBloomFilter) {
125126
Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
127+
HoodieSchema effectiveSchema = HoodieAvroWriteSupport.generateEffectiveSchema(schema, config);
128+
Properties props = config.getProps();
129+
// Auto-detect variant shredding provider from classpath if not explicitly configured
130+
if (!props.containsKey(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key())) {
131+
String detected = detectShreddingProvider();
132+
if (detected != null) {
133+
props.setProperty(PARQUET_VARIANT_SHREDDING_PROVIDER_CLASS.key(), detected);
134+
}
135+
}
126136
return (HoodieAvroWriteSupport) ReflectionUtils.loadClass(
127137
config.getStringOrDefault(HoodieStorageConfig.HOODIE_AVRO_WRITE_SUPPORT_CLASS),
128138
new Class<?>[] {MessageType.class, HoodieSchema.class, Option.class, Properties.class},
129-
getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(schema), schema, filter, config.getProps());
139+
getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(effectiveSchema), schema, filter, props);
140+
}
141+
142+
/**
143+
* Auto-detect a {@link org.apache.hudi.avro.VariantShreddingProvider} implementation
144+
* available on the classpath. Returns the fully-qualified class name if found, or null.
145+
*/
146+
private static String detectShreddingProvider() {
147+
String[] candidates = {
148+
"org.apache.hudi.variant.Spark4VariantShreddingProvider"
149+
};
150+
for (String candidate : candidates) {
151+
try {
152+
Class.forName(candidate);
153+
return candidate;
154+
} catch (ClassNotFoundException e) {
155+
// not on classpath, try next
156+
}
157+
}
158+
return null;
130159
}
131160
}

0 commit comments

Comments
 (0)