Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 8bf7f1e

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-21133][CORE] Fix HighlyCompressedMapStatus#writeExternal throws NPE
## What changes were proposed in this pull request? Fix HighlyCompressedMapStatus#writeExternal NPE: ``` 17/06/18 15:00:27 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/06/18 15:00:27 ERROR MapOutputTrackerMaster: java.lang.NullPointerException java.io.IOException: java.lang.NullPointerException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1310) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) ... 17 more 17/06/18 15:00:27 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.17.47.20:50188 17/06/18 15:00:27 ERROR Utils: Exception encountered java.lang.NullPointerException at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply$mcV$sp(MapStatus.scala:171) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.scheduler.HighlyCompressedMapStatus$$anonfun$writeExternal$2.apply(MapStatus.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) at org.apache.spark.scheduler.HighlyCompressedMapStatus.writeExternal(MapStatus.scala:167) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:617) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:616) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) at org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:619) at org.apache.spark.MapOutputTrackerMaster.getSerializedMapOutputStatuses(MapOutputTracker.scala:562) at org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:351) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ## How was this patch tested? manual tests Author: Yuming Wang <[email protected]> Closes apache#18343 from wangyum/SPARK-21133. (cherry picked from commit 9b57cd8) Signed-off-by: Wenchen Fan <[email protected]>
1 parent cf10fa8 commit 8bf7f1e

File tree

3 files changed

+18
-3
lines changed

3 files changed

+18
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private[spark] class HighlyCompressedMapStatus private (
141141
private[this] var numNonEmptyBlocks: Int,
142142
private[this] var emptyBlocks: RoaringBitmap,
143143
private[this] var avgSize: Long,
144-
@transient private var hugeBlockSizes: Map[Int, Byte])
144+
private var hugeBlockSizes: Map[Int, Byte])
145145
extends MapStatus with Externalizable {
146146

147147
// loc could be null when the default constructor is called during deserialization

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class KryoSerializer(conf: SparkConf)
175175
kryo.register(None.getClass)
176176
kryo.register(Nil.getClass)
177177
kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
178+
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
178179
kryo.register(classOf[ArrayBuffer[Any]])
179180

180181
kryo.setClassLoader(classLoader)

core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import scala.util.Random
2424
import org.mockito.Mockito._
2525
import org.roaringbitmap.RoaringBitmap
2626

27-
import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
27+
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
2828
import org.apache.spark.internal.config
29-
import org.apache.spark.serializer.JavaSerializer
29+
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
3030
import org.apache.spark.storage.BlockManagerId
3131

3232
class MapStatusSuite extends SparkFunSuite {
@@ -154,4 +154,18 @@ class MapStatusSuite extends SparkFunSuite {
154154
case part => assert(status2.getSizeForBlock(part) >= sizes(part))
155155
}
156156
}
157+
158+
test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") {
159+
val conf = new SparkConf()
160+
.set("spark.serializer", classOf[KryoSerializer].getName)
161+
.setMaster("local")
162+
.setAppName("SPARK-21133")
163+
val sc = new SparkContext(conf)
164+
try {
165+
val count = sc.parallelize(0 until 3000, 10).repartition(2001).collect().length
166+
assert(count === 3000)
167+
} finally {
168+
sc.stop()
169+
}
170+
}
157171
}

0 commit comments

Comments
 (0)