Skip to content

Commit ba638a7

Browse files
gss2002Marcelo Vanzin
authored andcommitted
[SPARK-25778] WriteAheadLogBackedBlockRDD in YARN Cluster Mode Fails …
…due lack of access to tmpDir from $PWD to HDFS WriteAheadLogBackedBlockRDD usage of java.io.tmpdir will fail if $PWD resolves to a folder in HDFS and the Spark YARN Cluster job does not have the correct access to this folder in regards to the dummy folder. So this patch provides an option to set spark.streaming.receiver.blockStore.tmpdir to override java.io.tmpdir which sets $PWD from YARN Cluster mode. ## What changes were proposed in this pull request? This change provides an option to override the java.io.tmpdir option so that when $PWD is resolved in YARN Cluster mode Spark does not attempt to use this folder and instead use the folder provided with the following option: spark.streaming.receiver.blockStore.tmpdir ## How was this patch tested? Patch was manually tested on a Spark Streaming Job with Write Ahead logs in Cluster mode. Closes apache#22867 from gss2002/SPARK-25778. Authored-by: gss2002 <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> (cherry picked from commit 2b671e7) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent ca426bf commit ba638a7

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
136136
// this dummy directory should not already exist otherwise the WAL will try to recover
137137
// past events from the directory and throw errors.
138138
val nonExistentDirectory = new File(
139-
System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
139+
System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).toURI.toString
140140
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
141141
SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
142142
dataRead = writeAheadLog.read(partition.walRecordHandle)

0 commit comments

Comments
 (0)