Skip to content

Commit fef4c22

Browse files
committed
fix review findings, asked Claude to rewrite docs of KeyedPartitioning
1 parent 91c38e0 commit fef4c22

File tree

2 files changed

+48
-19
lines changed

2 files changed

+48
-19
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -349,26 +349,54 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa
349349

350350
/**
351351
* Represents a partitioning where rows are split across partitions based on transforms defined by
352-
* `expressions`. `partitionKeys`, should contain value of partition key(s) in ascending order,
353-
* after evaluated by the transforms in `expressions`, for each input partition.
354-
* `partitionKeys` might not be unique when this partitioning is returned from a data source, but
355-
* the `GroupPartitionsExec` operator can group partitions with the same key and so make
356-
* `partitionKeys` unique.
352+
* `expressions`.
357353
*
358-
* The `originalPartitionKeys`, on the other hand, are partition values from the original input
359-
* splits returned by data sources. It may contain duplicated values.
354+
* == Partition Keys ==
355+
* This partitioning has two sets of partition keys:
360356
*
361-
* For example, if a data source reports partition transform expressions `[years(ts_col)]` with 4
362-
* input splits whose corresponding partition values are `[0, 1, 2, 2]`, then the `expressions` in
363-
* this case is `[years(ts_col)]`, while both `partitionKeys` and `originalPartitionKeys` are
364-
* `[0, 1, 2, 2]`.
365-
* After placing a `GroupPartitionsExec` operator on top of the data source, `partitionKeys` becomes
366-
* `[0, 1, 2]` but `originalPartitionKeys` remains `[0, 1, 2, 2]`.
357+
* - `partitionKeys`: The current partition key for each partition, in ascending order. May contain
358+
* duplicates when first created from a data source, but becomes unique after grouping.
367359
*
368-
* @param expressions Partition expressions for the partitioning.
369-
* @param partitionKeys The keys for the partitions, must be in ascending order.
370-
* @param originalPartitionKeys The original partition keys before any grouping has been applied by
371-
* a `GroupPartitionsExec` operator, must be in ascending order.
360+
* - `originalPartitionKeys`: The original partition keys from the data source, in ascending order.
361+
* Always preserves the original values, even after grouping. Used to track the original
362+
* distribution for optimization purposes.
363+
*
364+
* == Grouping State ==
365+
* A KeyedPartitioning can be in two states:
366+
*
367+
* - '''Ungrouped''' (when `isGrouped == false`): `partitionKeys` contains duplicates. Multiple
368+
* input partitions share the same key. This is the initial state when created from a data source.
369+
*
370+
* - '''Grouped''' (when `isGrouped == true`): `partitionKeys` contains only unique values. Each
371+
* partition has a distinct key. This state is achieved by applying `GroupPartitionsExec`, which
372+
* coalesces partitions with the same key.
373+
*
374+
* == Example ==
375+
* Consider a data source with partition transform `[years(ts_col)]` and 4 input splits:
376+
*
377+
* '''Before GroupPartitionsExec''' (ungrouped):
378+
* {{{
379+
* expressions: [years(ts_col)]
380+
* partitionKeys: [0, 1, 2, 2] // partition 2 and 3 have the same key
381+
* originalPartitionKeys: [0, 1, 2, 2]
382+
* numPartitions: 4
383+
* isGrouped: false
384+
* }}}
385+
*
386+
* '''After GroupPartitionsExec''' (grouped):
387+
* {{{
388+
* expressions: [years(ts_col)]
389+
* partitionKeys: [0, 1, 2] // duplicates removed, partitions coalesced
390+
* originalPartitionKeys: [0, 1, 2, 2] // unchanged, preserves original distribution
391+
* numPartitions: 3
392+
* isGrouped: true
393+
* }}}
394+
*
395+
* @param expressions Partition transform expressions (e.g., `years(col)`, `bucket(10, col)`).
396+
* @param partitionKeys Current partition keys, one per partition, in ascending order.
397+
* May contain duplicates before grouping.
398+
* @param originalPartitionKeys Original partition keys from the data source, in ascending order.
399+
* Preserves the initial distribution even after grouping.
372400
*/
373401
case class KeyedPartitioning(
374402
expressions: Seq[Expression],

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
2020

2121
import scala.collection.mutable.ArrayBuffer
2222

23-
import org.apache.spark.Partition
23+
import org.apache.spark.{Partition, SparkException}
2424
import org.apache.spark.rdd.{CoalescedRDD, PartitionCoalescer, PartitionGroup, RDD}
2525
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.expressions._
@@ -94,7 +94,8 @@ case class GroupPartitionsExec(
9494
lazy val firstKeyedPartitioning = {
9595
child.outputPartitioning.asInstanceOf[Partitioning with Expression].collectFirst {
9696
case k: KeyedPartitioning => k
97-
}.get
97+
}.getOrElse(
98+
throw new SparkException("GroupPartitionsExec requires a child with KeyedPartitioning"))
9899
}
99100

100101
/**

0 commit comments

Comments
 (0)