Skip to content

Commit 51e2b38

Browse files
Hieu Huynhtgravescs
authored andcommitted
[SPARK-24992][CORE] spark should randomize yarn local dir selection
**Description: [SPARK-24992](https://issues.apache.org/jira/browse/SPARK-24992)** Utils.getLocalDir is used to get path of a temporary directory. However, it always returns the the same directory, which is the first element in the array localRootDirs. When running on YARN, this might causes the case that we always write to one disk, which makes it busy while other disks are free. We should randomize the selection to spread out the loads. **What changes were proposed in this pull request?** This PR randomized the selection of local directory inside the method Utils.getLocalDir. This change affects the Utils.fetchFile method since it based on the fact that Utils.getLocalDir always return the same directory to cache file. Therefore, a new variable cachedLocalDir is used to cache the first localDirectory that it gets from Utils.getLocalDir. Also, when getting the configured local directories (inside Utils. getConfiguredLocalDirs), in case we are in yarn mode, the array of directories are also randomized before return. Author: Hieu Huynh <“[email protected]”> Closes apache#21953 from hthuynh2/SPARK_24992.
1 parent 1a5e460 commit 51e2b38

File tree

1 file changed

+17
-4
lines changed

1 file changed

+17
-4
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ private[spark] object Utils extends Logging {
8383
val random = new Random()
8484

8585
private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
86+
@volatile private var cachedLocalDir: String = ""
8687

8788
/**
8889
* Define a default value for driver memory here since this value is referenced across the code
@@ -462,7 +463,15 @@ private[spark] object Utils extends Logging {
462463
if (useCache && fetchCacheEnabled) {
463464
val cachedFileName = s"${url.hashCode}${timestamp}_cache"
464465
val lockFileName = s"${url.hashCode}${timestamp}_lock"
465-
val localDir = new File(getLocalDir(conf))
466+
// Set the cachedLocalDir for the first time and re-use it later
467+
if (cachedLocalDir.isEmpty) {
468+
this.synchronized {
469+
if (cachedLocalDir.isEmpty) {
470+
cachedLocalDir = getLocalDir(conf)
471+
}
472+
}
473+
}
474+
val localDir = new File(cachedLocalDir)
466475
val lockFile = new File(localDir, lockFileName)
467476
val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
468477
// Only one executor entry.
@@ -767,13 +776,17 @@ private[spark] object Utils extends Logging {
767776
* - Otherwise, this will return java.io.tmpdir.
768777
*
769778
* Some of these configuration options might be lists of multiple paths, but this method will
770-
* always return a single directory.
779+
* always return a single directory. The return directory is chosen randomly from the array
780+
* of directories it gets from getOrCreateLocalRootDirs.
771781
*/
772782
def getLocalDir(conf: SparkConf): String = {
773-
getOrCreateLocalRootDirs(conf).headOption.getOrElse {
783+
val localRootDirs = getOrCreateLocalRootDirs(conf)
784+
if (localRootDirs.isEmpty) {
774785
val configuredLocalDirs = getConfiguredLocalDirs(conf)
775786
throw new IOException(
776787
s"Failed to get a temp directory under [${configuredLocalDirs.mkString(",")}].")
788+
} else {
789+
localRootDirs(scala.util.Random.nextInt(localRootDirs.length))
777790
}
778791
}
779792

@@ -815,7 +828,7 @@ private[spark] object Utils extends Logging {
815828
// to what Yarn on this system said was available. Note this assumes that Yarn has
816829
// created the directories already, and that they are secured so that only the
817830
// user has access to them.
818-
getYarnLocalDirs(conf).split(",")
831+
randomizeInPlace(getYarnLocalDirs(conf).split(","))
819832
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
820833
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
821834
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {

0 commit comments

Comments
 (0)