Skip to content

Commit 7ad18ee

Browse files
rdblueMarcelo Vanzin
authored andcommitted
[SPARK-25004][CORE] Add spark.executor.pyspark.memory limit.
## What changes were proposed in this pull request? This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory: ``` File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer fe_eval_rec.update(f(src_rec_prep, mat_rec_prep)) File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, [])) File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare permutations = sorted(permutations, reverse=True) MemoryError ``` The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity. ## How was this patch tested? Tested memory limits in our YARN cluster and verified that MemoryError is thrown. Author: Ryan Blue <[email protected]> Closes apache#21977 from rdblue/SPARK-25004-add-python-memory-limit.
1 parent aff8f15 commit 7ad18ee

File tree

18 files changed

+105
-65
lines changed

18 files changed

+105
-65
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ private[spark] class PythonRDD(
4949
isFromBarrier: Boolean = false)
5050
extends RDD[Array[Byte]](parent) {
5151

52-
val bufferSize = conf.getInt("spark.buffer.size", 65536)
53-
val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
54-
5552
override def getPartitions: Array[Partition] = firstParent.partitions
5653

5754
override val partitioner: Option[Partitioner] = {
@@ -61,7 +58,7 @@ private[spark] class PythonRDD(
6158
val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
6259

6360
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
64-
val runner = PythonRunner(func, bufferSize, reuseWorker)
61+
val runner = PythonRunner(func)
6562
runner.compute(firstParent.iterator(split, context), split.index, context)
6663
}
6764

core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
2727

2828
import org.apache.spark._
2929
import org.apache.spark.internal.Logging
30+
import org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY
3031
import org.apache.spark.security.SocketAuthHelper
3132
import org.apache.spark.util._
3233

@@ -62,14 +63,20 @@ private[spark] object PythonEvalType {
6263
*/
6364
private[spark] abstract class BasePythonRunner[IN, OUT](
6465
funcs: Seq[ChainedPythonFunctions],
65-
bufferSize: Int,
66-
reuseWorker: Boolean,
6766
evalType: Int,
6867
argOffsets: Array[Array[Int]])
6968
extends Logging {
7069

7170
require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs")
7271

72+
private val conf = SparkEnv.get.conf
73+
private val bufferSize = conf.getInt("spark.buffer.size", 65536)
74+
private val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
75+
// each python worker gets an equal part of the allocation. the worker pool will grow to the
76+
// number of concurrent tasks, which is determined by the number of cores in this executor.
77+
private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
78+
.map(_ / conf.getInt("spark.executor.cores", 1))
79+
7380
// All the Python functions should have the same exec, version and envvars.
7481
protected val envVars = funcs.head.funcs.head.envVars
7582
protected val pythonExec = funcs.head.funcs.head.pythonExec
@@ -82,7 +89,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
8289
private[spark] var serverSocket: Option[ServerSocket] = None
8390

8491
// Authentication helper used when serving method calls via socket from Python side.
85-
private lazy val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
92+
private lazy val authHelper = new SocketAuthHelper(conf)
8693

8794
def compute(
8895
inputIterator: Iterator[IN],
@@ -95,6 +102,9 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
95102
if (reuseWorker) {
96103
envVars.put("SPARK_REUSE_WORKER", "1")
97104
}
105+
if (memoryMb.isDefined) {
106+
envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", memoryMb.get.toString)
107+
}
98108
val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
99109
// Whether is the worker released into idle pool
100110
val released = new AtomicBoolean(false)
@@ -485,20 +495,17 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
485495

486496
private[spark] object PythonRunner {
487497

488-
def apply(func: PythonFunction, bufferSize: Int, reuseWorker: Boolean): PythonRunner = {
489-
new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))), bufferSize, reuseWorker)
498+
def apply(func: PythonFunction): PythonRunner = {
499+
new PythonRunner(Seq(ChainedPythonFunctions(Seq(func))))
490500
}
491501
}
492502

493503
/**
494504
* A helper class to run Python mapPartition in Spark.
495505
*/
496-
private[spark] class PythonRunner(
497-
funcs: Seq[ChainedPythonFunctions],
498-
bufferSize: Int,
499-
reuseWorker: Boolean)
506+
private[spark] class PythonRunner(funcs: Seq[ChainedPythonFunctions])
500507
extends BasePythonRunner[Array[Byte], Array[Byte]](
501-
funcs, bufferSize, reuseWorker, PythonEvalType.NON_UDF, Array(Array(0))) {
508+
funcs, PythonEvalType.NON_UDF, Array(Array(0))) {
502509

503510
protected override def newWriterThread(
504511
env: SparkEnv,

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ package object config {
114114
.checkValue(_ >= 0, "The off-heap memory size must not be negative")
115115
.createWithDefault(0)
116116

117+
private[spark] val PYSPARK_EXECUTOR_MEMORY = ConfigBuilder("spark.executor.pyspark.memory")
118+
.bytesConf(ByteUnit.MiB)
119+
.createOptional
120+
117121
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
118122
.booleanConf.createWithDefault(false)
119123

docs/configuration.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,18 @@ of the most common options to set are:
179179
(e.g. <code>2g</code>, <code>8g</code>).
180180
</td>
181181
</tr>
182+
<tr>
183+
<td><code>spark.executor.pyspark.memory</code></td>
184+
<td>Not set</td>
185+
<td>
186+
The amount of memory to be allocated to PySpark in each executor, in MiB
187+
unless otherwise specified. If set, PySpark memory for an executor will be
188+
limited to this amount. If not set, Spark will not limit Python's memory use
189+
and it is up to the application to avoid exceeding the overhead memory space
190+
shared with other non-JVM processes. When PySpark is run in YARN, this memory
191+
is added to executor resource requests.
192+
</td>
193+
</tr>
182194
<tr>
183195
<td><code>spark.executor.memoryOverhead</code></td>
184196
<td>executorMemory * 0.10, with minimum of 384 </td>

python/pyspark/worker.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import os
2323
import sys
2424
import time
25+
import resource
2526
import socket
2627
import traceback
2728

@@ -263,6 +264,28 @@ def main(infile, outfile):
263264
isBarrier = read_bool(infile)
264265
boundPort = read_int(infile)
265266
secret = UTF8Deserializer().loads(infile)
267+
268+
# set up memory limits
269+
memory_limit_mb = int(os.environ.get('PYSPARK_EXECUTOR_MEMORY_MB', "-1"))
270+
total_memory = resource.RLIMIT_AS
271+
try:
272+
if memory_limit_mb > 0:
273+
(soft_limit, hard_limit) = resource.getrlimit(total_memory)
274+
msg = "Current mem limits: {0} of max {1}\n".format(soft_limit, hard_limit)
275+
print(msg, file=sys.stderr)
276+
277+
# convert to bytes
278+
new_limit = memory_limit_mb * 1024 * 1024
279+
280+
if soft_limit == resource.RLIM_INFINITY or new_limit < soft_limit:
281+
msg = "Setting mem limits to {0} of max {1}\n".format(new_limit, new_limit)
282+
print(msg, file=sys.stderr)
283+
resource.setrlimit(total_memory, (new_limit, new_limit))
284+
285+
except (resource.error, OSError, ValueError) as e:
286+
# not all systems support resource limits, so warn instead of failing
287+
print("WARN: Failed to set memory limit: {0}\n".format(e), file=sys.stderr)
288+
266289
# initialize global state
267290
taskContext = None
268291
if isBarrier:

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ private[spark] class Client(
9191
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
9292
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
9393

94+
private val isPython = sparkConf.get(IS_PYTHON_APP)
95+
private val pysparkWorkerMemory: Int = if (isPython) {
96+
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
97+
} else {
98+
0
99+
}
100+
94101
private val distCacheMgr = new ClientDistributedCacheManager()
95102

96103
private val principal = sparkConf.get(PRINCIPAL).orNull
@@ -333,12 +340,12 @@ private[spark] class Client(
333340
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
334341
logInfo("Verifying our application has not requested more than the maximum " +
335342
s"memory capability of the cluster ($maxMem MB per container)")
336-
val executorMem = executorMemory + executorMemoryOverhead
343+
val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
337344
if (executorMem > maxMem) {
338-
throw new IllegalArgumentException(s"Required executor memory ($executorMemory" +
339-
s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
340-
"Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
341-
"'yarn.nodemanager.resource.memory-mb'.")
345+
throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
346+
s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
347+
s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
348+
s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
342349
}
343350
val amMem = amMemory + amMemoryOverhead
344351
if (amMem > maxMem) {

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,17 @@ private[yarn] class YarnAllocator(
133133
// Additional memory overhead.
134134
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
135135
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
136+
protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) {
137+
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
138+
} else {
139+
0
140+
}
136141
// Number of cores per executor.
137142
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
138143
// Resource capability requested for each executors
139-
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
144+
private[yarn] val resource = Resource.newInstance(
145+
executorMemory + memoryOverhead + pysparkWorkerMemory,
146+
executorCores)
140147

141148
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
142149
"ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ abstract class BaseYarnClusterSuite
133133
extraClassPath: Seq[String] = Nil,
134134
extraJars: Seq[String] = Nil,
135135
extraConf: Map[String, String] = Map(),
136-
extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
136+
extraEnv: Map[String, String] = Map(),
137+
outFile: Option[File] = None): SparkAppHandle.State = {
137138
val deployMode = if (clientMode) "client" else "cluster"
138139
val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
139140
val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
@@ -161,6 +162,11 @@ abstract class BaseYarnClusterSuite
161162
}
162163
extraJars.foreach(launcher.addJar)
163164

165+
if (outFile.isDefined) {
166+
launcher.redirectOutput(outFile.get)
167+
launcher.redirectError()
168+
}
169+
164170
val handle = launcher.startApplication()
165171
try {
166172
eventually(timeout(2 minutes), interval(1 second)) {
@@ -179,17 +185,22 @@ abstract class BaseYarnClusterSuite
179185
* the tests enforce that something is written to a file after everything is ok to indicate
180186
* that the job succeeded.
181187
*/
182-
protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = {
183-
checkResult(finalState, result, "success")
184-
}
185-
186188
protected def checkResult(
187189
finalState: SparkAppHandle.State,
188190
result: File,
189-
expected: String): Unit = {
190-
finalState should be (SparkAppHandle.State.FINISHED)
191+
expected: String = "success",
192+
outFile: Option[File] = None): Unit = {
193+
// the context message is passed to assert as Any instead of a function. to lazily load the
194+
// output from the file, this passes an anonymous object that loads it in toString when building
195+
// an error message
196+
val output = new Object() {
197+
override def toString: String = outFile
198+
.map(Files.toString(_, StandardCharsets.UTF_8))
199+
.getOrElse("(stdout/stderr was not captured)")
200+
}
201+
assert(finalState === SparkAppHandle.State.FINISHED, output)
191202
val resultString = Files.toString(result, StandardCharsets.UTF_8)
192-
resultString should be (expected)
203+
assert(resultString === expected, output)
193204
}
194205

195206
protected def mainClassName(klass: Class[_]): String = {

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
282282
val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir)
283283
val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
284284
val result = File.createTempFile("result", null, tempDir)
285+
val outFile = Some(File.createTempFile("stdout", null, tempDir))
285286

286287
val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
287288
sparkArgs = Seq("--py-files" -> pyFiles),
288289
appArgs = Seq(result.getAbsolutePath()),
289290
extraEnv = extraEnvVars,
290-
extraConf = extraConf)
291-
checkResult(finalState, result)
291+
extraConf = extraConf,
292+
outFile = outFile)
293+
checkResult(finalState, result, outFile = outFile)
292294
}
293295

294296
private def testUseClassPathFirst(clientMode: Boolean): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ case class AggregateInPandasExec(
7979
override protected def doExecute(): RDD[InternalRow] = {
8080
val inputRDD = child.execute()
8181

82-
val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
83-
val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
8482
val sessionLocalTimeZone = conf.sessionLocalTimeZone
8583
val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
8684

@@ -137,8 +135,6 @@ case class AggregateInPandasExec(
137135

138136
val columnarBatchIter = new ArrowPythonRunner(
139137
pyFuncs,
140-
bufferSize,
141-
reuseWorker,
142138
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
143139
argOffsets,
144140
aggInputSchema,

0 commit comments

Comments
 (0)