Skip to content

Add support for bucketed Parquet writes #3194

@andygrove

Description

@andygrove

What is the problem the feature request solves?

Add support for bucketed writes in native Parquet writes.

Problem Description

Currently, native Parquet write support is blocked for any write operation involving bucketed tables. This is enforced in CometDataWritingCommand.scala at lines 62-64:

if (cmd.bucketSpec.isDefined) {
  return Unsupported(Some("Bucketed writes are not supported"))
}

This limitation prevents native Parquet writes from being used with bucketed tables, which are commonly used to optimize join and aggregation performance in Spark SQL.

Current Behavior

When attempting to write bucketed data using native Parquet writes:

  • Any operation with bucket specification falls back to Spark's default writer
  • Tables created with CLUSTERED BY cannot use native writes
  • Native write acceleration cannot be used for bucketed tables

Expected Behavior

Native Parquet writes should support:

  • Hash-based bucketing - Distribute data across buckets using hash function
  • Multiple bucket columns - Support bucketing by multiple columns
  • Sort within buckets - Support SORTED BY clause for intra-bucket ordering
  • Configurable bucket count - Support different numbers of buckets
  • Deterministic bucketing - Ensure same bucketing behavior as Spark's default writer

Impact

This is a medium-priority blocker for production readiness. Without bucketed write support:

  • Cannot use native writes with bucketed tables
  • Cannot leverage bucketing optimization for joins (bucket pruning, bucketed joins)
  • Cannot leverage bucketing optimization for aggregations
  • Tables optimized for specific query patterns cannot benefit from native writes

While not as common as partitioning, bucketing is used in production for:

  • Large fact tables in star schemas
  • Tables frequently joined on the same keys
  • Pre-aggregated data marts
  • Performance-critical analytical workloads

Technical Context

Affected Files:

  • spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala:62-64 - Where the check blocks bucketed writes
  • native/core/src/execution/operators/parquet_writer.rs - Native writer implementation that needs bucketing logic

Implementation Requirements:

  • Extract bucket specification (bucket columns, sort columns, number of buckets)
  • Implement Spark-compatible hash function for bucket assignment
  • Distribute rows to correct bucket files based on hash of bucket columns
  • Support intra-bucket sorting when specified
  • Proper file naming convention for bucketed files (e.g., part-00000--c000.snappy.parquet)
  • Integration with FileCommitProtocol for atomic bucketed writes

Bucketing Algorithm:
Spark uses a specific hash algorithm for bucketing:
bucket_id = hash(bucket_columns) % num_buckets
The native implementation must use the same hash function to ensure compatibility.

Considerations:

  • Bucket number must remain stable for the lifetime of the table
  • Bucket file naming must follow Spark conventions
  • Sorting within buckets impacts write performance but improves read performance
  • Bucketing can be combined with partitioning (bucketed partitioned tables)
  • Need to handle bucket ID assignment correctly to match Spark's behavior

Related Work

This is part of making native Parquet writes production-ready. Other blockers include:

  • Complex types support (#TBD)
  • Partitioned writes support (#TBD)
  • Cloud storage support (S3/GCS/Azure) (#TBD)

Acceptance Criteria

  • Support bucketing by single column
  • Support bucketing by multiple columns
  • Support configurable number of buckets
  • Support SORTED BY clause (sort within buckets)
  • Hash function produces same bucket assignments as Spark default writer
  • Correct bucket file naming convention
  • Support combination of bucketing and partitioning
  • Test coverage for various bucketing configurations
  • Roundtrip tests (write + read) verify bucket assignments are correct
  • Join performance tests show bucket pruning works correctly
  • Performance benchmarks show native bucketed writes match or exceed Spark default

Example Use Cases

// Basic bucketed table
df.write
  .bucketBy(4, "user_id")
  .saveAsTable("users_bucketed")

// Bucketed and sorted
df.write
  .bucketBy(8, "customer_id")
  .sortBy("order_date")
  .saveAsTable("orders_bucketed")

// Bucketed on multiple columns
df.write
  .bucketBy(16, "year", "month")
  .saveAsTable("events_bucketed")

// Combination of bucketing and partitioning
df.write
  .partitionBy("date")
  .bucketBy(32, "user_id")
  .saveAsTable("user_events")

References

Describe the potential solution

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions