Skip to content

Commit ec944cc

Browse files
cashmandcloud-fan
andcommitted
[SPARK-53659][SQL] Infer Variant shredding schema when writing to Parquet
### What changes were proposed in this pull request? When writing Variant to Parquet, we want the shredding schema to adapt to the data being written on a per-file basis. This PR adds a new output writer that buffers the first few rows before starting the write, then uses the content of those rows to determine a shredding schema, and only then creates the Parquet writer with that schema. The heuristics for determining the shredding schema are currently fairly simple: if a field appears consistently with a consistent type, we create `value` and `typed_value`, and if it appears with an inconsistent type, we only create `value`. We drop fields that occur in less than 10% of sampled rows, and have an upper bound of 300 total fields (counting `value` and `typed_value` separately) to avoid creating excessively wide Parquet schemas, which can cause performance issues. ### Why are the changes needed? Allows Spark to make use of the [Variant shredding spec](https://github.com/apache/parquet-format/blob/master/VariantShredding.md) without requiring the user to manually set a shredding schema. ### Does this PR introduce _any_ user-facing change? Only if `spark.sql.variant.inferShreddingSchema` and `spark.sql.variant.writeShredding.enabled` are both set to true. They currently false by default. ### How was this patch tested? Unit tests in PR. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52406 from cashmand/variant_shredding_inference. Lead-authored-by: cashmand <david.cashman@databricks.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 65755bd commit ec944cc

File tree

7 files changed

+1204
-1
lines changed

7 files changed

+1204
-1
lines changed

common/variant/src/main/java/org/apache/spark/types/variant/Variant.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public BigDecimal getDecimal() {
9999
return VariantUtil.getDecimal(value, pos);
100100
}
101101

102+
// Get the decimal value, including trailing zeros
103+
public BigDecimal getDecimalWithOriginalScale() {
104+
return VariantUtil.getDecimalWithOriginalScale(value, pos);
105+
}
106+
102107
// Get a float value from the variant.
103108
public float getFloat() {
104109
return VariantUtil.getFloat(value, pos);

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5422,6 +5422,31 @@ object SQLConf {
54225422
.stringConf
54235423
.createWithDefault("")
54245424

5425+
val VARIANT_SHREDDING_MAX_SCHEMA_WIDTH =
5426+
buildConf("spark.sql.variant.shredding.maxSchemaWidth")
5427+
.internal()
5428+
.doc("Maximum number of shredded fields to create when inferring a schema for Variant")
5429+
.version("4.1.0")
5430+
.intConf
5431+
.createWithDefault(300)
5432+
5433+
val VARIANT_SHREDDING_MAX_SCHEMA_DEPTH =
5434+
buildConf("spark.sql.variant.shredding.maxSchemaDepth")
5435+
.internal()
5436+
.doc("Maximum depth in Variant value to traverse when inferring a schema. " +
5437+
"Any array/object below this depth will be shredded as a single binary.")
5438+
.version("4.1.0")
5439+
.intConf
5440+
.createWithDefault(50)
5441+
5442+
val VARIANT_INFER_SHREDDING_SCHEMA =
5443+
buildConf("spark.sql.variant.inferShreddingSchema")
5444+
.internal()
5445+
.doc("Infer shredding schema when writing Variant columns in Parquet tables.")
5446+
.version("4.1.0")
5447+
.booleanConf
5448+
.createWithDefault(false)
5449+
54255450
val LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK =
54265451
buildConf("spark.sql.legacy.csv.enableDateTimeParsingFallback")
54275452
.internal()

0 commit comments

Comments
 (0)