Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ object SparkEnv extends Logging {
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
None,
bindAddress,
advertiseAddress,
port,
Expand All @@ -192,6 +193,7 @@ object SparkEnv extends Logging {
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
numaNodeId: Option[Int],
hostname: String,
port: Int,
numCores: Int,
Expand All @@ -200,6 +202,7 @@ object SparkEnv extends Logging {
val env = create(
conf,
executorId,
numaNodeId,
hostname,
hostname,
port,
Expand All @@ -214,9 +217,11 @@ object SparkEnv extends Logging {
/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
// scalastyle:off
private def create(
conf: SparkConf,
executorId: String,
numaNodeId: Option[Int],
bindAddress: String,
advertiseAddress: String,
port: Int,
Expand All @@ -225,7 +230,7 @@ object SparkEnv extends Logging {
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

// scalastyle:on
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

// Listener bus is only used on the driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
driverUrl: String,
executorId: String,
numaNodeId: Option[Int],
hostname: String,
cores: Int,
appId: String,
Expand Down Expand Up @@ -221,7 +222,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
driverConf, executorId, numaNodeId, hostname, port,
cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
Expand All @@ -239,6 +241,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
var hostname: String = null
var cores: Int = 0
var appId: String = null
var numaNodeId: Option[Int] = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()

Expand All @@ -260,6 +263,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--numa-node-id") :: value :: tail =>
numaNodeId = Some(value.trim.toInt)
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
Expand All @@ -281,7 +287,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
printUsageAndExit()
}

run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
run(driverUrl, executorId, numaNodeId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}

Expand All @@ -295,6 +301,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| --driver-url <driverUrl>
| --executor-id <executorId>
| --hostname <hostname>
| --numa-node-id <numaNodeId>
| --cores <cores>
| --app-id <appid>
| --worker-url <workerUrl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ private[spark] class ApplicationMaster(
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
"<hostname>", None, executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[yarn] class ExecutorRunnable(
masterAddress: String,
executorId: String,
hostname: String,
numaNodeId: Option[Int],
executorMemory: Int,
executorCores: Int,
appId: String,
Expand All @@ -70,6 +71,7 @@ private[yarn] class ExecutorRunnable(
def launchContextDebugInfo(): String = {
val commands = prepareCommand()
val env = prepareEnvironment()
logInfo(s"the command is $commands")

s"""
|===============================================================================
Expand Down Expand Up @@ -206,9 +208,19 @@ private[yarn] class ExecutorRunnable(
Seq("--user-class-path", "file:" + absPath)
}.toSeq

val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED)
// Don't need numa binding for driver.
val (numaCtlCommand, numaNodeOpts) = if (numaEnabled && executorId != "<executorId>"
&& numaNodeId.nonEmpty) {
val command = s"numactl --cpubind=${numaNodeId.get} --membind=${numaNodeId.get} "
(command, Seq("--numa-node-id", numaNodeId.get.toString))
} else {
("", Nil)
}

YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java",
"-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
Expand All @@ -217,10 +229,12 @@ private[yarn] class ExecutorRunnable(
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
numaNodeOpts ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
logInfo(s"the command is $commands")

// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
* The public methods of this class are thread-safe. All methods that mutate state are
* synchronized.
*/

private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
Expand All @@ -70,6 +71,9 @@ private[yarn] class YarnAllocator(

import YarnAllocator._




// RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
Expand Down Expand Up @@ -173,6 +177,14 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)

// Mapping from host to executor counter, we use the counter with a round-robin mode to
// determine the numa node id that the executor should bind.
private[yarn] val hostToNuma = new mutable.HashMap[String, Int]()

private[yarn] case class NumaInfo(cotainer2numa: mutable.HashMap[String, Int], numaUsed: Array[Boolean])

private[yarn] val hostToNumaInfo = new mutable.HashMap[String, NumaInfo]()

/**
* Use a different clock for YarnAllocator. This is mainly used for testing.
*/
Expand Down Expand Up @@ -489,6 +501,40 @@ private[yarn] class YarnAllocator(
for (container <- containersToUse) {
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost

// Setting the numa id that the executor should binding. Just round robin from 0 to
// totalNumaNumber for each host.
// TODO: This is very ugly, however this is should be processed in resource
// manager(such as yarn).
val preSize = hostToNuma.getOrElseUpdate(executorHostname, 0)
var numaNodeId = preSize % 2
hostToNuma.put(executorHostname, preSize + 1)
logInfo(s"old numaNodeId: $numaNodeId on host $executorHostname")

var newnumaNodeId = -1
// new numaid binding
val numaInfo = hostToNumaInfo.getOrElseUpdate(executorHostname,
NumaInfo(new mutable.HashMap[String, Int], Array(false,false)))
logInfo("numaInfo.numaUsed(0): " + numaInfo.numaUsed(0))
var i = 0
var boolbreak = true
// avoid using keyword break
while(i < 2 && boolbreak) {
logInfo(s"i: $i")
logInfo(s"numaInfo.numaUsed($i): " + numaInfo.numaUsed(i))
val numabool = numaInfo.numaUsed(i)
if(!numabool) {
newnumaNodeId = i
numaInfo.cotainer2numa.put(container.getId.toString, newnumaNodeId)
numaInfo.numaUsed(i) = true
// break
boolbreak = false
}
i = i+1
}

numaNodeId = newnumaNodeId
logInfo(s"new numaNodeId: $newnumaNodeId on host $executorHostname," + container.getId.toString)
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
Expand Down Expand Up @@ -517,6 +563,7 @@ private[yarn] class YarnAllocator(
driverUrl,
executorId,
executorHostname,
Some(numaNodeId),
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
Expand Down Expand Up @@ -585,9 +632,21 @@ private[yarn] class YarnAllocator(
case _ =>
// Enqueue the timestamp of failed executor
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
var numaNodeId = -1
val hostName = hostOpt.getOrElse("nohost")
val numaInfoOp = hostToNumaInfo.get(hostName)
numaInfoOp match {
case Some(numaInfo) =>
numaNodeId = numaInfo.cotainer2numa.get(containerId.toString).getOrElse(-1)
if(-1 != numaNodeId) numaInfo.numaUsed(numaNodeId) = false
case _ => -1
}

(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
". Diagnostics: " + completedContainer.getDiagnostics +
". numaNodeId: " + numaNodeId +
". hostName: " + hostName)

}
if (exitCausedByApp) {
Expand Down
6 changes: 6 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ package object config {
.stringConf
.createOptional

private[spark] val SPARK_YARN_NUMA_ENABLED = ConfigBuilder("spark.yarn.numa.enabled")
.doc("Whether enabling numa binding when executor start up. This is recommend to true " +
"when persistent memory is enabled.")
.booleanConf
.createWithDefault(false)

/* Cluster-mode launcher configuration. */

private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
Expand Down