Skip to content

Commit b75bd17

Browse files
squitoMarcelo Vanzin
authored andcommitted
[SPARK-21928][CORE] Set classloader on SerializerManager's private kryo
## What changes were proposed in this pull request? We have to make sure that SerializerManager's private instance of kryo also uses the right classloader, regardless of the current thread classloader. In particular, this fixes serde during remote cache fetches, as those occur in netty threads. ## How was this patch tested? Manual tests & existing suite via jenkins. I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally. I manually verified the warning message is no longer present. Author: Imran Rashid <[email protected]> Closes apache#19280 from squito/SPARK-21928_ser_classloader.
1 parent f10cbf1 commit b75bd17

File tree

3 files changed

+9
-1
lines changed

3 files changed

+9
-1
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ private[spark] class Executor(
131131

132132
// Set the classloader for serializer
133133
env.serializer.setDefaultClassLoader(replClassLoader)
134+
// SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads
135+
// for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too.
136+
env.serializerManager.setDefaultClassLoader(replClassLoader)
134137

135138
// Max size of direct result. If task result is bigger than this, we use the block manager
136139
// to send the result back.

core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ private[spark] class SerializerManager(
4141

4242
private[this] val kryoSerializer = new KryoSerializer(conf)
4343

44+
def setDefaultClassLoader(classLoader: ClassLoader): Unit = {
45+
kryoSerializer.setDefaultClassLoader(classLoader)
46+
}
47+
4448
private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
4549
private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
4650
val primitiveClassTags = Set[ClassTag[_]](

core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.metrics.MetricsSystem
4242
import org.apache.spark.rdd.RDD
4343
import org.apache.spark.rpc.RpcEnv
4444
import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription}
45-
import org.apache.spark.serializer.JavaSerializer
45+
import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
4646
import org.apache.spark.shuffle.FetchFailedException
4747
import org.apache.spark.storage.BlockManagerId
4848
import org.apache.spark.util.UninterruptibleThread
@@ -234,6 +234,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug
234234
val mockMemoryManager = mock[MemoryManager]
235235
when(mockEnv.conf).thenReturn(conf)
236236
when(mockEnv.serializer).thenReturn(serializer)
237+
when(mockEnv.serializerManager).thenReturn(mock[SerializerManager])
237238
when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
238239
when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
239240
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)

0 commit comments

Comments
 (0)