Skip to content

Commit 8f6c6d6

Browse files
gaoyajun02Mridul Muralidharan
authored andcommitted
[SPARK-52923][CORE] Allow ShuffleManager to control push merge during shuffle registration
### What changes were proposed in this pull request? This PR moves the `shuffleManager.registerShuffle()` call to occur after the initialization of `_shuffleMergeAllowed` in `ShuffleDependency`. ### Why are the changes needed? While `spark.shuffle.push.enabled` provides global control over push-based shuffle, there are scenarios requiring more granular control:  - Mass spark application migration scenarios where different jobs may need different shuffle strategies - Remote shuffle manager(e.g. celeborn/uniffle) need shuffle-level fallback capabilities to push-based shuffle - Dynamic decision making based on shuffle characteristics during shuffle registration ### Does this PR introduce _any_ user-facing change? No, this is an internal refactoring that maintains backward compatibility. The default behavior remains unchanged. ### How was this patch tested? - Existing unit tests continue to pass - The change only affects the order of initialization, not the logic ### Was this patch authored or co-authored using generative AI tooling? No Closes #51629 from gaoyajun02/SPARK-52923. Authored-by: gaoyajun02 <gaoyajun02@meituan.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 23d9253 commit 8f6c6d6

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
127127

128128
val shuffleId: Int = _rdd.context.newShuffleId()
129129

130-
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
131-
shuffleId, this)
132-
133130
private[this] val numPartitions = rdd.partitions.length
134131

135132
// By default, shuffle merge is allowed for ShuffleDependency if push based shuffle
136133
// is enabled
137134
private[this] var _shuffleMergeAllowed = canShuffleMergeBeEnabled()
138135

136+
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
137+
shuffleId, this)
138+
139139
private[spark] def setShuffleMergeAllowed(shuffleMergeAllowed: Boolean): Unit = {
140140
_shuffleMergeAllowed = shuffleMergeAllowed
141141
}

0 commit comments

Comments
 (0)