Skip to content

Commit 6847e93

Browse files
ash211cloud-fan
authored andcommitted
[SPARK-21563][CORE] Fix race condition when serializing TaskDescriptions and adding jars
## What changes were proposed in this pull request? Fix the race condition when serializing TaskDescriptions and adding jars by keeping the set of jars and files for a TaskSet constant across the lifetime of the TaskSet. Otherwise TaskDescription serialization can produce an invalid serialization when new file/jars are added concurrently as the TaskDescription is serialized. ## How was this patch tested? Additional unit test ensures jars/files contained in the TaskDescription remain constant throughout the lifetime of the TaskSet. Author: Andrew Ash <[email protected]> Closes apache#18913 from ash211/SPARK-21563.
1 parent 34d2134 commit 6847e93

File tree

3 files changed

+46
-3
lines changed

3 files changed

+46
-3
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,6 +1490,8 @@ class SparkContext(config: SparkConf) extends Logging {
14901490
/**
14911491
* Add a file to be downloaded with this Spark job on every node.
14921492
*
1493+
* If a file is added during execution, it will not be available until the next TaskSet starts.
1494+
*
14931495
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
14941496
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
14951497
* use `SparkFiles.get(fileName)` to find its download location.
@@ -1506,6 +1508,8 @@ class SparkContext(config: SparkConf) extends Logging {
15061508
/**
15071509
* Add a file to be downloaded with this Spark job on every node.
15081510
*
1511+
* If a file is added during execution, it will not be available until the next TaskSet starts.
1512+
*
15091513
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
15101514
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
15111515
* use `SparkFiles.get(fileName)` to find its download location.
@@ -1792,6 +1796,9 @@ class SparkContext(config: SparkConf) extends Logging {
17921796

17931797
/**
17941798
* Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future.
1799+
*
1800+
* If a jar is added during execution, it will not be available until the next TaskSet starts.
1801+
*
17951802
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
17961803
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
17971804
*/

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ private[spark] class TaskSetManager(
5656

5757
private val conf = sched.sc.conf
5858

59+
// SPARK-21563 make a copy of the jars/files so they are consistent across the TaskSet
60+
private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*)
61+
private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)
62+
5963
// Quantile of tasks at which to start speculation
6064
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
6165
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
@@ -502,8 +506,8 @@ private[spark] class TaskSetManager(
502506
execId,
503507
taskName,
504508
index,
505-
sched.sc.addedFiles,
506-
sched.sc.addedJars,
509+
addedFiles,
510+
addedJars,
507511
task.localProperties,
508512
serializedTask)
509513
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.internal.config
3232
import org.apache.spark.internal.Logging
3333
import org.apache.spark.serializer.SerializerInstance
3434
import org.apache.spark.storage.BlockManagerId
35-
import org.apache.spark.util.{AccumulatorV2, ManualClock}
35+
import org.apache.spark.util.{AccumulatorV2, ManualClock, Utils}
3636

3737
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
3838
extends DAGScheduler(sc) {
@@ -1214,6 +1214,38 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
12141214
verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
12151215
}
12161216

1217+
test("SPARK-21563 context's added jars shouldn't change mid-TaskSet") {
1218+
sc = new SparkContext("local", "test")
1219+
val addedJarsPreTaskSet = Map[String, Long](sc.addedJars.toSeq: _*)
1220+
assert(addedJarsPreTaskSet.size === 0)
1221+
1222+
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
1223+
val taskSet1 = FakeTask.createTaskSet(3)
1224+
val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES, clock = new ManualClock)
1225+
1226+
// all tasks from the first taskset have the same jars
1227+
val taskOption1 = manager1.resourceOffer("exec1", "host1", NO_PREF)
1228+
assert(taskOption1.get.addedJars === addedJarsPreTaskSet)
1229+
val taskOption2 = manager1.resourceOffer("exec1", "host1", NO_PREF)
1230+
assert(taskOption2.get.addedJars === addedJarsPreTaskSet)
1231+
1232+
// even with a jar added mid-TaskSet
1233+
val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar")
1234+
sc.addJar(jarPath.toString)
1235+
val addedJarsMidTaskSet = Map[String, Long](sc.addedJars.toSeq: _*)
1236+
assert(addedJarsPreTaskSet !== addedJarsMidTaskSet)
1237+
val taskOption3 = manager1.resourceOffer("exec1", "host1", NO_PREF)
1238+
// which should have the old version of the jars list
1239+
assert(taskOption3.get.addedJars === addedJarsPreTaskSet)
1240+
1241+
// and then the jar does appear in the next TaskSet
1242+
val taskSet2 = FakeTask.createTaskSet(1)
1243+
val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock)
1244+
1245+
val taskOption4 = manager2.resourceOffer("exec1", "host1", NO_PREF)
1246+
assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
1247+
}
1248+
12171249
private def createTaskResult(
12181250
id: Int,
12191251
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {

0 commit comments

Comments
 (0)