Skip to content

[SPARK-55535][SPARK-55092][SQL] Refactor KeyGroupedPartitioning and Storage Partition Join#54330

Open
peter-toth wants to merge 6 commits intoapache:masterfrom
peter-toth:SPARK-55535-refactor-kgp-and-spj
Open

[SPARK-55535][SPARK-55092][SQL] Refactor KeyGroupedPartitioning and Storage Partition Join#54330
peter-toth wants to merge 6 commits intoapache:masterfrom
peter-toth:SPARK-55535-refactor-kgp-and-spj

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Feb 15, 2026

What changes were proposed in this pull request?

This PR extracts partitiong grouping logic from BatchScanExec to a new GroupPartitionsExec operator and replaces KeyGroupedPartitioning with KeyedPartitioning.

  • KeyedPartitioning represents a partitioning where partition keys are known. It can be grouped (clustered) or not by partition keys. When grouping is required the new operator can be inserted into a plan at any place (similary to how exchanges are inserted to satisfy expected distributions) and so creating the necessary grouped/replicated partitions by keys.
  • The implementation of GroupPartitionsExec uses the already existing CoalescedRDD with a new GroupedPartitionCoalescer.
  • This PR kind of restores DataSourceRDD to its pre-SPJ form, but the change revealed a bug with custom metrics reporting. It turned out that if there is a coalesce operation right after a scan (which is the basis of how partition grouping in this implementation works, but can also happen without this PR when the coalesce is explicitely added to the plan), then multiple partitions (i.e. multiple partiton groups before this PR) of the scan can belong to a given Spark task. This is because the number of tasks are defined by the coalesce operation. This means that DataSourceRDD.compute() can be called from a task multiple times and each call adds a new TaskCompletionListener, which causes reporting conflicting metrics (one metrics by each scan partition). This PR fixes the bug using a ThreadLocal to track the current scan partition reader and to install only one listener per task.
  • This PR tries to unify the terminology and prefers using PartitionKey instead of the previous PartitionValues to be in sync with the DSv2 HasPartitionKey interface.
  • After this PR StoragePartitionJoinParams is not required in BatchScanExec, its fields are now part of the new GroupPartitionsExec operator.

Why are the changes needed?

To solve the issue of unecessary partition grouping (#53859) and simplify KGP/SPJ implementation.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs adjusted and new UTs from #53859.

Was this patch authored or co-authored using generative AI tooling?

Yes, documentation and some helpers were added by Claude.

case h: HashPartitioningLike => expandOutputPartitioning(h)
case c: PartitioningCollection => expandOutputPartitioning(c)
case other => other
val expandedPartitioning = expandOutputPartitioning(streamedPlan.outputPartitioning)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BroadcastHashJoinExec related changes are extracted to a separate PR: #54335, since those changes are valid on their own.

@peter-toth peter-toth force-pushed the SPARK-55535-refactor-kgp-and-spj branch from 0c0e88b to c1e3e93 Compare February 17, 2026 20:21
@peter-toth peter-toth changed the title [WIP][SPARK-55535][SQL] Refactor KeyGroupedPartitioning and Storage Partition Join [WIP][SPARK-55535][SPARK-55092][SQL] Refactor KeyGroupedPartitioning and Storage Partition Join Feb 18, 2026
@peter-toth peter-toth force-pushed the SPARK-55535-refactor-kgp-and-spj branch from c1e3e93 to 53034f5 Compare February 18, 2026 18:48
@peter-toth
Copy link
Contributor Author

This PR is requires and contains the changes of #54335. Once that PR is merged I will rebase this one.

@peter-toth peter-toth changed the title [WIP][SPARK-55535][SPARK-55092][SQL] Refactor KeyGroupedPartitioning and Storage Partition Join [SPARK-55535][SPARK-55092][SQL] Refactor KeyGroupedPartitioning and Storage Partition Join Feb 18, 2026
@peter-toth peter-toth marked this pull request as ready for review February 18, 2026 19:38
@peter-toth
Copy link
Contributor Author

@dongjoon-hyun
Copy link
Member

Why don't you merge #54335 , @peter-toth ? You already got the required community approval on your PR.

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the coalesceRDD is a good idea, but its a bit risky I feel to change so much the DataSourceRDD. Is there another way? Maybe have a customRDD that holds the grouped partitions? though im not so familiar with this part

* In addition, its length must be the same as the number of Spark partitions (and thus is a 1-1
* mapping), and each row in `partitionValues` must be unique.
* Represents a partitioning where rows are split across partitions based on transforms defined by
* `expressions`. `partitionKeys`, should contain value of partition key(s) in ascending order,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems we lost 'if defined', so the comma doesn't make sense anymore

* `partitionKeys` unique.
*
* The `originalPartitionValues`, on the other hand, are partition values from the original input
* The `originalPartitionKeys`, on the other hand, are partition values from the original input
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like 'originalPartitionKeys' are a copy of 'partitionKeys' before any grouping is applied, . Can we clarify the comment as its not clear from it currently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked Claude to rewrite and clarify the documentation of KeyedPartitioning: d58cafe. I think its version sounds even better, but let me know.

lazy val firstKeyedPartitioning = {
child.outputPartitioning.asInstanceOf[Partitioning with Expression].collectFirst {
case k: KeyedPartitioning => k
}.get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about add .getOrElse(throw new SparkException("requires child with KeyedPartitioning")) to be more clear when error happens

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in d58cafe

readerStateThreadLocal.remove()
}
} else {
readerState.metrics.foreach(reader.initMetricsValues)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we missing a close, maybe here to old readerState.reader?
cc @viirya i believe fixed a memory leak here to also take a look at the new approach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I refined ReaderState and close readerState.reader properly in 5ca18c3.

@szehon-ho
Copy link
Member

Overall i like the GroupPartitionExec idea, but definitely would be good to have some of @sunchao @viirya @chirag-s-db @cloud-fan to also take a look

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 19, 2026

Why don't you merge #54335 , @peter-toth ? You already got the required community approval on your PR.

Sorry @dongjoon-hyun, I didn't notice your approval yesterday. Thanks for your review! @viirya requested a small change just now, once CI completes I will merge that PR and rebase this one.

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 19, 2026

I think the coalesceRDD is a good idea, but its a bit risky I feel to change so much the DataSourceRDD. Is there another way? Maybe have a customRDD that holds the grouped partitions? though im not so familiar with this part

Initially I wanted to add a new RDD for GroupPartitionsExec but it was very similar to CoalescedRDD / CoalescedRDDPartition so just creating a new GroupedPartitionCoalescer, which holds the grouped partitions, seemed like a cleaner approach.

DataSourceRDD is now back to its pre partition grouping form. IMO we will need to backport the ThreadLocal[ReaderState] fix to previous Spark versions too so as to fix the case when there is a coalece after the scan.

peter-toth added a commit that referenced this pull request Feb 19, 2026
### What changes were proposed in this pull request?

This is a minor refector of `BroadcastHashJoinExec.outputPartitioning` to:
- simlify the logic and
- make it future proof by using `Partitioning with Expression` instead of `HashPartitioningLike`.

### Why are the changes needed?
Code cleanup and add support for future partitionings that implement `Expression` but not `HashPartitioningLike`. (Like `KeyedPartitioning` is in #54330.)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #54335 from peter-toth/SPARK-55551-improve-broadcasthashjoinexec-output-partitioning.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Peter Toth <peter.toth@gmail.com>
@peter-toth peter-toth force-pushed the SPARK-55535-refactor-kgp-and-spj branch from 69ded9c to 8a087af Compare February 19, 2026 16:50
@peter-toth
Copy link
Contributor Author

#54335 is merged and I've rebased this PR on latest master.

@szehon-ho
Copy link
Member

szehon-ho commented Feb 19, 2026

Thanks for the refactor. I was actually wondering if this approach works: (using cursor generation, please double check if it makes sense). Its more localized to SPJ case then

class GroupedPartitionsRDD(
    @transient private val dataSourceRDD: DataSourceRDD,
    groupedPartitions: Seq[Seq[Int]]
  ) extends RDD[InternalRow](dataSourceRDD) {
  
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val groupedPartition = split.asInstanceOf[GroupedPartitionsRDDPartition]
      val readers = new ArrayBuffer[PartitionReader[_]]()
      var listenerAdded = false
      
      def addCompletionListener(): Unit = {
        if (!listenerAdded) {
          context.addTaskCompletionListener[Unit] { _ =>
            readers.foreach { reader =>
              try {
                CustomMetrics.updateMetrics(
                  reader.currentMetricsValues.toImmutableArraySeq,
                  dataSourceRDD.customMetrics)
                reader.close()
              } catch {
                case e: Exception =>
                  logWarning(s"Error closing reader: ${e.getMessage}", e)
              }
            }
          }
          listenerAdded = true
        }
      }
      
      // Use a self-closing iterator wrapper
      new Iterator[InternalRow] {
        private val parentIter = groupedPartition.parentIndices.iterator
        private var currentIterator: Iterator[InternalRow] = null
        private var currentReader: PartitionReader[_] = null
        
        private def advance(): Boolean = {
          while (currentIterator == null || !currentIterator.hasNext) {
            if (!parentIter.hasNext) {
              // Close current reader if exists
              if (currentReader != null) {
                try {
                  CustomMetrics.updateMetrics(
                    currentReader.currentMetricsValues.toImmutableArraySeq,
                    dataSourceRDD.customMetrics)
                  currentReader.close()
                } catch {
                  case e: Exception =>
                    logWarning(s"Error closing reader: ${e.getMessage}", e)
                }
                currentReader = null
              }
              return false
            }
            
            // Close previous reader
            if (currentReader != null) {
              try {
                CustomMetrics.updateMetrics(
                  currentReader.currentMetricsValues.toImmutableArraySeq,
                  dataSourceRDD.customMetrics)
                currentReader.close()
              } catch {
                case e: Exception =>
                  logWarning(s"Error closing reader: ${e.getMessage}", e)
              }
            }
            
            val parentIndex = parentIter.next()
            val inputPartitionOpt = dataSourceRDD.inputPartitions(parentIndex)
            
            currentIterator = inputPartitionOpt.iterator.flatMap { inputPartition =>
              currentReader = if (dataSourceRDD.columnarReads) {
                dataSourceRDD.partitionReaderFactory.createColumnarReader(inputPartition)
              } else {
                dataSourceRDD.partitionReaderFactory.createReader(inputPartition)
              }
              
              addCompletionListener()
              
              val iter = if (dataSourceRDD.columnarReads) {
                new MetricsBatchIterator(
                  new PartitionIterator[ColumnarBatch](
                    currentReader.asInstanceOf[PartitionReader[ColumnarBatch]], 
                    dataSourceRDD.customMetrics))
              } else {
                new MetricsRowIterator(
                  new PartitionIterator[InternalRow](
                    currentReader.asInstanceOf[PartitionReader[InternalRow]], 
                    dataSourceRDD.customMetrics))
              }
              
              iter.asInstanceOf[Iterator[InternalRow]]
            }
          }
          true
        }
    
    override def hasNext: Boolean = advance()
    
    override def next(): InternalRow = {
      if (!hasNext) {
        throw new NoSuchElementException("next on empty iterator")
      }
      currentIterator.next()
    }
  }
}


private case class GroupedPartitionsRDDPartition(
    index: Int,
    parentIndices: Array[Int],
    preferredLocation: Option[String] = None
  ) extends Partition

that can be used in GroupPartitionsExec like:

override protected def doExecute(): RDD[InternalRow] = {
  if (groupedPartitions.isEmpty) {
    sparkContext.emptyRDD
  } else {
    child.execute() match {
      case dsRDD: DataSourceRDD =>
        new GroupedPartitionsRDD(
          dsRDD,
          groupedPartitions.map(_._2))
      case _ => // error or fallback?
    }
  }
}

The current code is definitely more Spark-native, reusing coalesceRDD, but my doubt is the threadlocal and chance for memory leak like fixed by @viirya : #51503 But ill defer to others, if people like this approach more.

@szehon-ho
Copy link
Member

edit: i guess its what you are saying you considered, in your previous comment

@szehon-ho
Copy link
Member

DataSourceRDD is now back to its pre partition grouping form. IMO we will need to backport the ThreadLocal[ReaderState] fix to previous Spark versions too so as to fix the case when there is a coalece after the scan.

btw, didn't get this, are you saying there is some leak in current DataSourceRDD that need ThreadLocal to fix? Should it be fixed spearately?

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 19, 2026

The current code is definitely more Spark-native, reusing coalesceRDD, but my doubt is the threadlocal and chance for memory leak like fixed by @viirya : #51503 But ill defer to others, if people like this approach more.

As far as I see you assume that the child is a DataSourceRDD, but the main point of this change is to move the grouping logic to the new operator (GroupPartitionsExec) so as to be able to insert it into those plans as well where there is no BatchScanExec / DataSourceRDD, e.g. cached or checkpointed plans.

Also, even if there is a BatchScanExec (DataSourceRDD) in the plan,GroupPartitionsExec is inserted right below the join / aggregate where the grouping is needed (like an exchange is inserted), so there can be other nodes / RDDs between the GroupPartitionsExec and the data source. So we can't assume that.

DataSourceRDD is now back to its pre partition grouping form. IMO we will need to backport the ThreadLocal[ReaderState] fix to previous Spark versions too so as to fix the case when there is a coalece after the scan.

btw, didn't get this, are you saying there is some leak in current DataSourceRDD that need ThreadLocal to fix? Should it be fixed spearately?

Not necessarily a leak, but there are some issues with custom metrics reporting and when the readers gets closed. Consider the following plan (without this PR):

...
  CoalesceExec (1)
    BatchScanExec (returns 2 partitions)

We have only 1 task in the stage due to coalesce(1) and that task calls the DataSourceRDD.compute() for both input partitions. It doesn't matter if those partitions are actually grouped or not. Both invocations create 1-1 reader and install 1-1 listener to close the readers and report customer metrics of the reader. But the listeners run only at the end of the task so the first reader is kept open for too long. What's worse, the 2 reported metrics conflict and only one will be kept.
So I think yes, we need to fix this issue on other branches as well. I don't think we can/should backport this refactor to older versions, but we can extract the ThreadLocal[ReaderState] logic and apply it on other branches. Technically we could fix it separately, but as this PR hugely simplifies the affected DataSourceRDD, it is easier to do it together with this refactor on the master branch.

@szehon-ho
Copy link
Member

szehon-ho commented Feb 19, 2026

As far as I see you assume that the child is a DataSourceRDD, but the main point of this change is to move the grouping logic to the new operator (GroupPartitionsExec) so as to be able to insert it into those plans as well where there is no BatchScanExec / DataSourceRDD, e.g. cached or checkpointed plans.

I see, sorry forgot about that case.

Interesting, so you mean we are losing metrics. Should we at least add a test? It may make sense to do in separate pr, but depends the final approach. The approach does make sense, I am a bit unsure if ThreadLocal is the best/safe approach, consider the risk to introduce memory leak, as you can see its a bit tricky, but I am not so familiar with DataSourceRDD code.

@peter-toth
Copy link
Contributor Author

Sure, let me add a test tomorrow, and maybe someone can come up with a better idea to fix it.

@peter-toth
Copy link
Contributor Author

peter-toth commented Feb 20, 2026

I extracted the metrics reporting bug / fix to SPARK-55619 / #54396 and added a new test.

@szehon-ho
Copy link
Member

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments