Skip to content

Add support for partitioned Parquet writes #3193

@andygrove

Description

@andygrove

What is the problem the feature request solves?

Summary

Add support for partitioned writes in native Parquet writes.

Problem Description

Currently, native Parquet write support is blocked for any write operation involving partitioned tables. This is enforced in CometDataWritingCommand.scala at lines 66-68:

if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) {
  return Unsupported(Some("Partitioned writes are not supported"))
}

Additionally, dynamic partition overwrite is hardcoded to false at line 196:

java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now

This limitation prevents native Parquet writes from being used with partitioned tables, which are a fundamental pattern in data lakes and modern data warehousing.

Current Behavior

When attempting to write partitioned data using native Parquet writes:
- Any operation with partitionBy() falls back to Spark's default writer
- Static partition writes (e.g., INSERT INTO table PARTITION(year=2024)) fall back
- Dynamic partition writes fall back
- Native write acceleration cannot be used for partitioned tables

Expected Behavior

Native Parquet writes should support:
- Dynamic partitioning - Automatically partition data based on column values
- Static partitioning - Write to specific partition(s) specified by user
- Dynamic partition overwrite - Overwrite only the partitions present in the data
- Static partition overwrite - Overwrite specific partitions
- Multi-level partitioning - Support multiple partition columns (e.g., year/month/day)
- Partition pruning - Efficient handling of partition directories

Impact

This is a critical blocker preventing native Parquet writes from being enabled by default in production. Without partitioned write support:
- Cannot use native writes with Hive tables, Delta Lake, Apache Iceberg
- Incompatible with standard data lake partitioning patterns (e.g., date-based partitioning)
- Cannot leverage partition pruning benefits for query optimization
- Severely limits applicability to enterprise use cases

Partitioned tables are the norm in production Spark deployments, making this feature essential for production readiness.

Technical Context

Affected Files:
- spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala:66-68 - Where the check blocks partitioned writes
- spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala:196 - Dynamic partition overwrite hardcoded to false
- native/core/src/execution/operators/parquet_writer.rs - Native writer implementation that needs partitioning logic

Implementation Requirements:
- Partition column extraction from data rows
- Dynamic directory structure creation (e.g., year=2024/month=01/)
- Partition value encoding in directory names
- Support for both Hive-style (col=value) and non-Hive-style partitioning
- Integration with FileCommitProtocol for atomic partition writes
- Handling of partition schema vs data schema separation

Considerations:
- Partition discovery on read must remain compatible
- Small file problem - many partitions can create many small files
- Partition column ordering matters for query performance
- Special characters in partition values need proper encoding

Related Work

This is part of making native Parquet writes production-ready. Other blockers include:
- Complex types support (#TBD)
- Cloud storage support (S3/GCS/Azure) (#TBD)
- Complete FileCommitProtocol implementation (#TBD)

Acceptance Criteria

- Support dynamic partitioning with single partition column
- Support dynamic partitioning with multiple partition columns
- Support static partition writes
- Support dynamic partition overwrite mode
- Support static partition overwrite mode
- Proper Hive-style partition directory naming (col=value)
- Handle special characters in partition values correctly
- Test coverage for all partitioning modes
- Integration tests with common partition patterns (date-based, etc.)
- Verify compatibility with Hive metastore partition metadata
- Performance benchmarks show native partitioned writes outperform Spark default

Example Use Cases

```scala
// Dynamic partitioning
df.write
  .partitionBy("year", "month", "day")
  .parquet("/path/to/table")

// Static partition write
df.write
  .mode("overwrite")
  .insertInto("table PARTITION(year=2024, month=01)")

// Dynamic partition overwrite
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write
  .mode("overwrite")
  .partitionBy("date")
  .parquet("/path/to/table")

Describe the potential solution

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions