Skip to content

Commit 70462f2

Browse files
NiharSsrowen
authored andcommitted
[SPARK-24926][CORE] Ensure numCores is used consistently in all netty configurations
## What changes were proposed in this pull request? Netty could just ignore user-provided configurations. In particular, spark.driver.cores would be ignored when considering the number of cores available to netty (which would usually just default to Runtime.availableProcessors() ). In transport configurations, the number of threads are based directly on how many cores the system believes it has available, and in yarn cluster mode this would generally overshoot the user-preferred value. ## How was this patch tested? As this is mostly a configuration change, tests were done manually by adding spark-submit confs and verifying the number of threads started by netty was what was expected. Passes scalastyle checks from dev/run-tests Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Nihar Sheth <[email protected]> Closes apache#21885 from NiharS/usableCores.
1 parent 684c719 commit 70462f2

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ class SparkContext(config: SparkConf) extends Logging {
254254
conf: SparkConf,
255255
isLocal: Boolean,
256256
listenerBus: LiveListenerBus): SparkEnv = {
257-
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
257+
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
258258
}
259259

260260
private[spark] def env: SparkEnv = _env
@@ -2668,17 +2668,30 @@ object SparkContext extends Logging {
26682668
}
26692669

26702670
/**
2671-
* The number of driver cores to use for execution in local mode, 0 otherwise.
2671+
* The number of cores available to the driver to use for tasks such as I/O with Netty
26722672
*/
26732673
private[spark] def numDriverCores(master: String): Int = {
2674+
numDriverCores(master, null)
2675+
}
2676+
2677+
/**
2678+
* The number of cores available to the driver to use for tasks such as I/O with Netty
2679+
*/
2680+
private[spark] def numDriverCores(master: String, conf: SparkConf): Int = {
26742681
def convertToInt(threads: String): Int = {
26752682
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
26762683
}
26772684
master match {
26782685
case "local" => 1
26792686
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
26802687
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
2681-
case _ => 0 // driver is not used for execution
2688+
case "yarn" =>
2689+
if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
2690+
conf.getInt("spark.driver.cores", 0)
2691+
} else {
2692+
0
2693+
}
2694+
case _ => 0 // Either driver is not being used, or its core count will be interpolated later
26822695
}
26832696
}
26842697

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[netty] class NettyRpcEnv(
5050
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
5151
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
5252
"rpc",
53-
conf.getInt("spark.rpc.io.threads", 0))
53+
conf.getInt("spark.rpc.io.threads", numUsableCores))
5454

5555
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
5656

0 commit comments

Comments
 (0)