Skip to content

Commit 39dfaf2

Browse files
Hieu Huynhtgravescs
authored andcommitted
[SPARK-24519] Make the threshold for highly compressed map status configurable
**Problem** MapStatus uses hardcoded value of 2000 partitions to determine if it should use highly compressed map status. We should make it configurable to allow users to more easily tune their jobs with respect to this without having for them to modify their code to change the number of partitions. Note we can leave this as an internal/undocumented config for now until we have more advise for the users on how to set this config. Some of my reasoning: The config gives you a way to easily change something without the user having to change code, redeploy jar, and then run again. You can simply change the config and rerun. It also allows for easier experimentation. Changing the # of partitions has other side affects, whether good or bad is situation dependent. It can be worse are you could be increasing # of output files when you don't want to be, affects the # of tasks needs and thus executors to run in parallel, etc. There have been various talks about this number at spark summits where people have told customers to increase it to be 2001 partitions. Note if you just do a search for spark 2000 partitions you will fine various things all talking about this number. This shows that people are modifying their code to take this into account so it seems to me having this configurable would be better. Once we have more advice for users we could expose this and document information on it. **What changes were proposed in this pull request?** I make the hardcoded value mentioned above to be configurable under the name _SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS_, which has default value to be 2000. Users can set it to the value they want by setting the property name _spark.shuffle.minNumPartitionsToHighlyCompress_ **How was this patch tested?** I wrote a unit test to make sure that the default value is 2000, and _IllegalArgumentException_ will be thrown if user set it to a non-positive value. The unit test also checks that highly compressed map status is correctly used when the number of partition is greater than _SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS_. Author: Hieu Huynh <“[email protected]”> Closes apache#21527 from hthuynh2/spark_branch_1.
1 parent 92c2f00 commit 39dfaf2

File tree

3 files changed

+38
-1
lines changed

3 files changed

+38
-1
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,4 +552,11 @@ package object config {
552552
.timeConf(TimeUnit.SECONDS)
553553
.createWithDefaultString("1h")
554554

555+
private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS =
556+
ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress")
557+
.internal()
558+
.doc("Number of partitions to determine if MapStatus should use HighlyCompressedMapStatus")
559+
.intConf
560+
.checkValue(v => v > 0, "The value should be a positive integer.")
561+
.createWithDefault(2000)
555562
}

core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ private[spark] sealed trait MapStatus {
5050
private[spark] object MapStatus {
5151

5252
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
53-
if (uncompressedSizes.length > 2000) {
53+
if (uncompressedSizes.length > Option(SparkEnv.get)
54+
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
55+
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
5456
HighlyCompressedMapStatus(loc, uncompressedSizes)
5557
} else {
5658
new CompressedMapStatus(loc, uncompressedSizes)

core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,4 +188,32 @@ class MapStatusSuite extends SparkFunSuite {
188188
assert(count === 3000)
189189
}
190190
}
191+
192+
test("SPARK-24519: HighlyCompressedMapStatus has configurable threshold") {
193+
val conf = new SparkConf()
194+
val env = mock(classOf[SparkEnv])
195+
doReturn(conf).when(env).conf
196+
SparkEnv.set(env)
197+
val sizes = Array.fill[Long](500)(150L)
198+
// Test default value
199+
val status = MapStatus(null, sizes)
200+
assert(status.isInstanceOf[CompressedMapStatus])
201+
// Test Non-positive values
202+
for (s <- -1 to 0) {
203+
assertThrows[IllegalArgumentException] {
204+
conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
205+
val status = MapStatus(null, sizes)
206+
}
207+
}
208+
// Test positive values
209+
Seq(1, 100, 499, 500, 501).foreach { s =>
210+
conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
211+
val status = MapStatus(null, sizes)
212+
if(sizes.length > s) {
213+
assert(status.isInstanceOf[HighlyCompressedMapStatus])
214+
} else {
215+
assert(status.isInstanceOf[CompressedMapStatus])
216+
}
217+
}
218+
}
191219
}

0 commit comments

Comments
 (0)