diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 1296386..dc822de 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -174,6 +174,7 @@ object SparkEnv extends Logging { create( conf, SparkContext.DRIVER_IDENTIFIER, + None, bindAddress, advertiseAddress, port, @@ -192,6 +193,7 @@ object SparkEnv extends Logging { private[spark] def createExecutorEnv( conf: SparkConf, executorId: String, + numaNodeId: Option[Int], hostname: String, port: Int, numCores: Int, @@ -200,6 +202,7 @@ object SparkEnv extends Logging { val env = create( conf, executorId, + numaNodeId, hostname, hostname, port, @@ -214,9 +217,11 @@ object SparkEnv extends Logging { /** * Helper method to create a SparkEnv for a driver or an executor. */ + // scalastyle:off private def create( conf: SparkConf, executorId: String, + numaNodeId: Option[Int], bindAddress: String, advertiseAddress: String, port: Int, @@ -225,7 +230,7 @@ object SparkEnv extends Logging { ioEncryptionKey: Option[Array[Byte]], listenerBus: LiveListenerBus = null, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { - + // scalastyle:on val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER // Listener bus is only used on the driver diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 92a2790..e70bb63 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -177,6 +177,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { private def run( driverUrl: String, executorId: String, + numaNodeId: Option[Int], hostname: String, cores: Int, appId: String, @@ -221,7 +222,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } val env = SparkEnv.createExecutorEnv( - driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) + driverConf, executorId, numaNodeId, hostname, port, + cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) @@ -239,6 +241,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var hostname: String = null var cores: Int = 0 var appId: String = null + var numaNodeId: Option[Int] = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() @@ -260,6 +263,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { case ("--app-id") :: value :: tail => appId = value argv = tail + case ("--numa-node-id") :: value :: tail => + numaNodeId = Some(value.trim.toInt) + argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) @@ -281,7 +287,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { printUsageAndExit() } - run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) + run(driverUrl, executorId, numaNodeId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) } @@ -295,6 +301,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --driver-url | --executor-id | --hostname + | --numa-node-id | --cores | --app-id | --worker-url diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index aabae14..8fce657 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -352,7 +352,7 @@ private[spark] class ApplicationMaster( val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = sparkConf.get(EXECUTOR_CORES) val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "", - "", executorMemory, executorCores, appId, securityMgr, localResources) + "", None, executorMemory, executorCores, appId, securityMgr, localResources) dummyRunner.launchContextDebugInfo() } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 8e0533f..7357860 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -50,6 +50,7 @@ private[yarn] class ExecutorRunnable( masterAddress: String, executorId: String, hostname: String, + numaNodeId: Option[Int], executorMemory: Int, executorCores: Int, appId: String, @@ -70,6 +71,7 @@ private[yarn] class ExecutorRunnable( def launchContextDebugInfo(): String = { val commands = prepareCommand() val env = prepareEnvironment() + logInfo(s"the command is $commands") s""" |=============================================================================== @@ -206,9 +208,19 @@ private[yarn] class ExecutorRunnable( Seq("--user-class-path", "file:" + absPath) }.toSeq + val numaEnabled = sparkConf.get(SPARK_YARN_NUMA_ENABLED) + // Don't need numa binding for driver. + val (numaCtlCommand, numaNodeOpts) = if (numaEnabled && executorId != "" + && numaNodeId.nonEmpty) { + val command = s"numactl --cpubind=${numaNodeId.get} --membind=${numaNodeId.get} " + (command, Seq("--numa-node-id", numaNodeId.get.toString)) + } else { + ("", Nil) + } + YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", + numaCtlCommand + Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", @@ -217,10 +229,12 @@ private[yarn] class ExecutorRunnable( "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId) ++ + numaNodeOpts ++ userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") + logInfo(s"the command is $commands") // TODO: it would be nicer to just make sure there are no null commands here commands.map(s => if (s == null) "null" else s).toList diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 0b66d1c..6452b73 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -57,6 +57,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} * The public methods of this class are thread-safe. All methods that mutate state are * synchronized. */ + private[yarn] class YarnAllocator( driverUrl: String, driverRef: RpcEndpointRef, @@ -70,6 +71,9 @@ private[yarn] class YarnAllocator( import YarnAllocator._ + + + // RackResolver logs an INFO message whenever it resolves a rack, which is way too often. if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) @@ -173,6 +177,14 @@ private[yarn] class YarnAllocator( private[yarn] val containerPlacementStrategy = new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource) + // Mapping from host to executor counter, we use the counter with a round-robin mode to + // determine the numa node id that the executor should bind. + private[yarn] val hostToNuma = new mutable.HashMap[String, Int]() + + private[yarn] case class NumaInfo(cotainer2numa: mutable.HashMap[String, Int], numaUsed: Array[Boolean]) + + private[yarn] val hostToNumaInfo = new mutable.HashMap[String, NumaInfo]() + /** * Use a different clock for YarnAllocator. This is mainly used for testing. */ @@ -489,6 +501,40 @@ private[yarn] class YarnAllocator( for (container <- containersToUse) { executorIdCounter += 1 val executorHostname = container.getNodeId.getHost + + // Setting the numa id that the executor should binding. Just round robin from 0 to + // totalNumaNumber for each host. + // TODO: This is very ugly, however this is should be processed in resource + // manager(such as yarn). + val preSize = hostToNuma.getOrElseUpdate(executorHostname, 0) + var numaNodeId = preSize % 2 + hostToNuma.put(executorHostname, preSize + 1) + logInfo(s"old numaNodeId: $numaNodeId on host $executorHostname") + + var newnumaNodeId = -1 + // new numaid binding + val numaInfo = hostToNumaInfo.getOrElseUpdate(executorHostname, + NumaInfo(new mutable.HashMap[String, Int], Array(false,false))) + logInfo("numaInfo.numaUsed(0): " + numaInfo.numaUsed(0)) + var i = 0 + var boolbreak = true + // avoid using keyword break + while(i < 2 && boolbreak) { + logInfo(s"i: $i") + logInfo(s"numaInfo.numaUsed($i): " + numaInfo.numaUsed(i)) + val numabool = numaInfo.numaUsed(i) + if(!numabool) { + newnumaNodeId = i + numaInfo.cotainer2numa.put(container.getId.toString, newnumaNodeId) + numaInfo.numaUsed(i) = true +// break + boolbreak = false + } + i = i+1 + } + + numaNodeId = newnumaNodeId + logInfo(s"new numaNodeId: $newnumaNodeId on host $executorHostname," + container.getId.toString) val containerId = container.getId val executorId = executorIdCounter.toString assert(container.getResource.getMemory >= resource.getMemory) @@ -517,6 +563,7 @@ private[yarn] class YarnAllocator( driverUrl, executorId, executorHostname, + Some(numaNodeId), executorMemory, executorCores, appAttemptId.getApplicationId.toString, @@ -585,9 +632,21 @@ private[yarn] class YarnAllocator( case _ => // Enqueue the timestamp of failed executor failedExecutorsTimeStamps.enqueue(clock.getTimeMillis()) + var numaNodeId = -1 + val hostName = hostOpt.getOrElse("nohost") + val numaInfoOp = hostToNumaInfo.get(hostName) + numaInfoOp match { + case Some(numaInfo) => + numaNodeId = numaInfo.cotainer2numa.get(containerId.toString).getOrElse(-1) + if(-1 != numaNodeId) numaInfo.numaUsed(numaNodeId) = false + case _ => -1 + } + (true, "Container marked as failed: " + containerId + onHostStr + ". Exit status: " + completedContainer.getExitStatus + - ". Diagnostics: " + completedContainer.getDiagnostics) + ". Diagnostics: " + completedContainer.getDiagnostics + + ". numaNodeId: " + numaNodeId + + ". hostName: " + hostName) } if (exitCausedByApp) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index ca8c890..55c4480 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -120,6 +120,12 @@ package object config { .stringConf .createOptional + private[spark] val SPARK_YARN_NUMA_ENABLED = ConfigBuilder("spark.yarn.numa.enabled") + .doc("Whether enabling numa binding when executor start up. This is recommend to true " + + "when persistent memory is enabled.") + .booleanConf + .createWithDefault(false) + /* Cluster-mode launcher configuration. */ private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")