Skip to content

Commit 39f75b4

Browse files
liutang123srowen
authored andcommitted
[SPARK-27192][CORE] spark.task.cpus should be less or equal than spark.executor.cores
## What changes were proposed in this pull request? check spark.task.cpus before creating TaskScheduler in SparkContext ## How was this patch tested? UT Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#24261 from liutang123/SPARK-27192. Authored-by: liulijia <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 982c4c8 commit 39f75b4

File tree

6 files changed

+62
-25
lines changed

6 files changed

+62
-25
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,6 @@ private[spark] class ExecutorAllocationManager(
222222
throw new SparkException("Dynamic allocation of executors requires the external " +
223223
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
224224
}
225-
if (tasksPerExecutorForFullParallelism == 0) {
226-
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.")
227-
}
228225

229226
if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
230227
throw new SparkException(

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -575,16 +575,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
575575
}
576576
}
577577

578-
if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) {
579-
val executorCores = get(EXECUTOR_CORES)
580-
val taskCpus = get(CPUS_PER_TASK)
581-
582-
if (executorCores < taskCpus) {
583-
throw new SparkException(
584-
s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.")
585-
}
586-
}
587-
588578
val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
589579
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
590580
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2665,8 +2665,27 @@ object SparkContext extends Logging {
26652665
// When running locally, don't try to re-execute tasks on failure.
26662666
val MAX_LOCAL_TASK_FAILURES = 1
26672667

2668+
// SPARK-26340: Ensure that executor's core num meets at least one task requirement.
2669+
def checkCpusPerTask(
2670+
clusterMode: Boolean,
2671+
maxCoresPerExecutor: Option[Int]): Unit = {
2672+
val cpusPerTask = sc.conf.get(CPUS_PER_TASK)
2673+
if (clusterMode && sc.conf.contains(EXECUTOR_CORES)) {
2674+
if (sc.conf.get(EXECUTOR_CORES) < cpusPerTask) {
2675+
throw new SparkException(s"${CPUS_PER_TASK.key}" +
2676+
s" must be <= ${EXECUTOR_CORES.key} when run on $master.")
2677+
}
2678+
} else if (maxCoresPerExecutor.isDefined) {
2679+
if (maxCoresPerExecutor.get < cpusPerTask) {
2680+
throw new SparkException(s"Only ${maxCoresPerExecutor.get} cores available per executor" +
2681+
s" when run on $master, and ${CPUS_PER_TASK.key} must be <= it.")
2682+
}
2683+
}
2684+
}
2685+
26682686
master match {
26692687
case "local" =>
2688+
checkCpusPerTask(clusterMode = false, Some(1))
26702689
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
26712690
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
26722691
scheduler.initialize(backend)
@@ -2679,6 +2698,7 @@ object SparkContext extends Logging {
26792698
if (threadCount <= 0) {
26802699
throw new SparkException(s"Asked to run locally with $threadCount threads")
26812700
}
2701+
checkCpusPerTask(clusterMode = false, Some(threadCount))
26822702
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
26832703
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
26842704
scheduler.initialize(backend)
@@ -2689,19 +2709,22 @@ object SparkContext extends Logging {
26892709
// local[*, M] means the number of cores on the computer with M failures
26902710
// local[N, M] means exactly N threads with M failures
26912711
val threadCount = if (threads == "*") localCpuCount else threads.toInt
2712+
checkCpusPerTask(clusterMode = false, Some(threadCount))
26922713
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
26932714
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
26942715
scheduler.initialize(backend)
26952716
(backend, scheduler)
26962717

26972718
case SPARK_REGEX(sparkUrl) =>
2719+
checkCpusPerTask(clusterMode = true, None)
26982720
val scheduler = new TaskSchedulerImpl(sc)
26992721
val masterUrls = sparkUrl.split(",").map("spark://" + _)
27002722
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
27012723
scheduler.initialize(backend)
27022724
(backend, scheduler)
27032725

27042726
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
2727+
checkCpusPerTask(clusterMode = true, Some(coresPerSlave.toInt))
27052728
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
27062729
val memoryPerSlaveInt = memoryPerSlave.toInt
27072730
if (sc.executorMemory > memoryPerSlaveInt) {
@@ -2722,6 +2745,7 @@ object SparkContext extends Logging {
27222745
(backend, scheduler)
27232746

27242747
case masterUrl =>
2748+
checkCpusPerTask(clusterMode = true, None)
27252749
val cm = getClusterManager(masterUrl) match {
27262750
case Some(clusterMgr) => clusterMgr
27272751
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,6 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
140140
assert(sc.appName === "My other app")
141141
}
142142

143-
test("creating SparkContext with cpus per tasks bigger than cores per executors") {
144-
val conf = new SparkConf(false)
145-
.set(EXECUTOR_CORES, 1)
146-
.set(CPUS_PER_TASK, 2)
147-
intercept[SparkException] { sc = new SparkContext(conf) }
148-
}
149-
150143
test("nested property names") {
151144
// This wasn't supported by some external conf parsing libraries
152145
System.setProperty("spark.test.a", "a")

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,27 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
710710
assert(runningTaskIds.isEmpty)
711711
}
712712
}
713+
714+
test(s"Avoid setting ${CPUS_PER_TASK.key} unreasonably (SPARK-27192)") {
715+
val FAIL_REASON = s"${CPUS_PER_TASK.key} must be <="
716+
Seq(
717+
("local", 2, None),
718+
("local[2]", 3, None),
719+
("local[2, 1]", 3, None),
720+
("spark://test-spark-cluster", 2, Option(1)),
721+
("local-cluster[1, 1, 1000]", 2, Option(1)),
722+
("yarn", 2, Option(1))
723+
).foreach { case (master, cpusPerTask, executorCores) =>
724+
val conf = new SparkConf()
725+
conf.set(CPUS_PER_TASK, cpusPerTask)
726+
executorCores.map(executorCores => conf.set(EXECUTOR_CORES, executorCores))
727+
val ex = intercept[SparkException] {
728+
sc = new SparkContext(master, "test", conf)
729+
}
730+
assert(ex.getMessage.contains(FAIL_REASON))
731+
resetSparkContext()
732+
}
733+
}
713734
}
714735

715736
object SparkContextSuite {

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
7777
}
7878

7979
def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
80-
val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
80+
setupSchedulerWithMaster("local", confs: _*)
81+
}
82+
83+
def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = {
84+
val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite")
8185
confs.foreach { case (k, v) => conf.set(k, v) }
8286
sc = new SparkContext(conf)
8387
taskScheduler = new TaskSchedulerImpl(sc)
@@ -155,7 +159,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
155159

156160
test("Scheduler correctly accounts for multiple CPUs per task") {
157161
val taskCpus = 2
158-
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
162+
val taskScheduler = setupSchedulerWithMaster(
163+
s"local[$taskCpus]",
164+
config.CPUS_PER_TASK.key -> taskCpus.toString)
159165
// Give zero core offers. Should not generate any tasks
160166
val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0),
161167
new WorkerOffer("executor1", "host1", 0))
@@ -185,7 +191,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
185191

186192
test("Scheduler does not crash when tasks are not serializable") {
187193
val taskCpus = 2
188-
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
194+
val taskScheduler = setupSchedulerWithMaster(
195+
s"local[$taskCpus]",
196+
config.CPUS_PER_TASK.key -> taskCpus.toString)
189197
val numFreeCores = 1
190198
val taskSet = new TaskSet(
191199
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
@@ -1241,7 +1249,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
12411249

12421250
test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {
12431251
val taskCpus = 2
1244-
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
1252+
val taskScheduler = setupSchedulerWithMaster(
1253+
s"local[$taskCpus]",
1254+
config.CPUS_PER_TASK.key -> taskCpus.toString)
12451255

12461256
val numFreeCores = 3
12471257
val workerOffers = IndexedSeq(
@@ -1258,7 +1268,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
12581268

12591269
test("schedule tasks for a barrier taskSet if all tasks can be launched together") {
12601270
val taskCpus = 2
1261-
val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString)
1271+
val taskScheduler = setupSchedulerWithMaster(
1272+
s"local[$taskCpus]",
1273+
config.CPUS_PER_TASK.key -> taskCpus.toString)
12621274

12631275
val numFreeCores = 3
12641276
val workerOffers = IndexedSeq(

0 commit comments

Comments
 (0)