Skip to content

Commit 6e19f76

Browse files
cloud-fanhvanhovell
authored andcommitted
[SPARK-23989][SQL] exchange should copy data before non-serialized shuffle
## What changes were proposed in this pull request? In Spark SQL, we usually reuse the `UnsafeRow` instance and need to copy the data when a place buffers non-serialized objects. Shuffle may buffer objects if we don't make it to the bypass merge shuffle or unsafe shuffle. `ShuffleExchangeExec.needToCopyObjectsBeforeShuffle` misses the case that, if `spark.sql.shuffle.partitions` is large enough, we could fail to run unsafe shuffle and go with the non-serialized shuffle. This bug is very hard to hit since users wouldn't set such a large number of partitions(16 million) for Spark SQL exchange. TODO: test ## How was this patch tested? todo. Author: Wenchen Fan <[email protected]> Closes apache#21101 from cloud-fan/shuffle.
1 parent 0deaa52 commit 6e19f76

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,9 @@ object ShuffleExchangeExec {
153153
* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
154154
*
155155
* @param partitioner the partitioner for the shuffle
156-
* @param serializer the serializer that will be used to write rows
157156
* @return true if rows should be copied before being shuffled, false otherwise
158157
*/
159-
private def needToCopyObjectsBeforeShuffle(
160-
partitioner: Partitioner,
161-
serializer: Serializer): Boolean = {
158+
private def needToCopyObjectsBeforeShuffle(partitioner: Partitioner): Boolean = {
162159
// Note: even though we only use the partitioner's `numPartitions` field, we require it to be
163160
// passed instead of directly passing the number of partitions in order to guard against
164161
// corner-cases where a partitioner constructed with `numPartitions` partitions may output
@@ -167,22 +164,24 @@ object ShuffleExchangeExec {
167164
val shuffleManager = SparkEnv.get.shuffleManager
168165
val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
169166
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
167+
val numParts = partitioner.numPartitions
170168
if (sortBasedShuffleOn) {
171-
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
172-
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
169+
if (numParts <= bypassMergeThreshold) {
173170
// If we're using the original SortShuffleManager and the number of output partitions is
174171
// sufficiently small, then Spark will fall back to the hash-based shuffle write path, which
175172
// doesn't buffer deserialized records.
176173
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
177174
false
178-
} else if (serializer.supportsRelocationOfSerializedObjects) {
175+
} else if (numParts <= SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
179176
// SPARK-4550 and SPARK-7081 extended sort-based shuffle to serialize individual records
180177
// prior to sorting them. This optimization is only applied in cases where shuffle
181178
// dependency does not specify an aggregator or ordering and the record serializer has
182-
// certain properties. If this optimization is enabled, we can safely avoid the copy.
179+
// certain properties and the number of partitions doesn't exceed the limitation. If this
180+
// optimization is enabled, we can safely avoid the copy.
183181
//
184-
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
185-
// need to check whether the optimization is enabled and supported by our serializer.
182+
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, and the
183+
// serializer in Spark SQL always satisfy the properties, so we only need to check whether
184+
// the number of partitions exceeds the limitation.
186185
false
187186
} else {
188187
// Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory, so we must
@@ -298,7 +297,7 @@ object ShuffleExchangeExec {
298297
rdd
299298
}
300299

301-
if (needToCopyObjectsBeforeShuffle(part, serializer)) {
300+
if (needToCopyObjectsBeforeShuffle(part)) {
302301
newRdd.mapPartitionsInternal { iter =>
303302
val getPartitionKey = getPartitionKeyExtractor()
304303
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }

0 commit comments

Comments
 (0)