Skip to content

Commit 0a32238

Browse files
akupchsrowen
authored andcommitted
[SPARK-25885][CORE][MINOR] HighlyCompressedMapStatus deserialization/construction optimization
## What changes were proposed in this pull request? Removal of intermediate structures in HighlyCompressedMapStatus will speed up its creation and deserialization time. https://issues.apache.org/jira/browse/SPARK-25885 ## How was this patch tested? Additional tests are not necessary for the patch. Closes apache#22894 from Koraseg/mapStatusesOptimization. Authored-by: koraseg <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 8fbc183 commit 0a32238

File tree

1 file changed

+7
-8
lines changed

1 file changed

+7
-8
lines changed

core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler
2020
import java.io.{Externalizable, ObjectInput, ObjectOutput}
2121

2222
import scala.collection.mutable
23-
import scala.collection.mutable.ArrayBuffer
2423

2524
import org.roaringbitmap.RoaringBitmap
2625

@@ -149,7 +148,7 @@ private[spark] class HighlyCompressedMapStatus private (
149148
private[this] var numNonEmptyBlocks: Int,
150149
private[this] var emptyBlocks: RoaringBitmap,
151150
private[this] var avgSize: Long,
152-
private var hugeBlockSizes: Map[Int, Byte])
151+
private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte])
153152
extends MapStatus with Externalizable {
154153

155154
// loc could be null when the default constructor is called during deserialization
@@ -189,13 +188,13 @@ private[spark] class HighlyCompressedMapStatus private (
189188
emptyBlocks.readExternal(in)
190189
avgSize = in.readLong()
191190
val count = in.readInt()
192-
val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
191+
val hugeBlockSizesImpl = mutable.Map.empty[Int, Byte]
193192
(0 until count).foreach { _ =>
194193
val block = in.readInt()
195194
val size = in.readByte()
196-
hugeBlockSizesArray += Tuple2(block, size)
195+
hugeBlockSizesImpl(block) = size
197196
}
198-
hugeBlockSizes = hugeBlockSizesArray.toMap
197+
hugeBlockSizes = hugeBlockSizesImpl
199198
}
200199
}
201200

@@ -215,7 +214,7 @@ private[spark] object HighlyCompressedMapStatus {
215214
val threshold = Option(SparkEnv.get)
216215
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
217216
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
218-
val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]()
217+
val hugeBlockSizes = mutable.Map.empty[Int, Byte]
219218
while (i < totalNumBlocks) {
220219
val size = uncompressedSizes(i)
221220
if (size > 0) {
@@ -226,7 +225,7 @@ private[spark] object HighlyCompressedMapStatus {
226225
totalSmallBlockSize += size
227226
numSmallBlocks += 1
228227
} else {
229-
hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))
228+
hugeBlockSizes(i) = MapStatus.compressSize(uncompressedSizes(i))
230229
}
231230
} else {
232231
emptyBlocks.add(i)
@@ -241,6 +240,6 @@ private[spark] object HighlyCompressedMapStatus {
241240
emptyBlocks.trim()
242241
emptyBlocks.runOptimize()
243242
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
244-
hugeBlockSizesArray.toMap)
243+
hugeBlockSizes)
245244
}
246245
}

0 commit comments

Comments
 (0)