Skip to content

Commit af3b816

Browse files
squitoMarcelo Vanzin
authored andcommitted
[SPARK-25855][CORE] Don't use erasure coding for event logs by default
## What changes were proposed in this pull request? This turns off hdfs erasure coding by default for event logs, regardless of filesystem defaults. Because this requires apis only available in hadoop 3, this uses reflection. EC isn't a very good choice for event logs, as hflush() is a no-op, and so updates to the file are not visible for a long time. This can still be configured by setting "spark.eventLog.allowErasureCoding=true", which will use filesystem defaults. ## How was this patch tested? deployed a cluster with the changes with HDFS EC on. By default, event logs didn't use EC, but configuration still would allow EC. Also tried writing to the local fs (which doesn't support EC at all) and things worked fine. Closes apache#22881 from squito/SPARK-25855. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent e4cb42a commit af3b816

File tree

4 files changed

+53
-2
lines changed

4 files changed

+53
-2
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
21+
import java.lang.reflect.Method
2122
import java.security.PrivilegedExceptionAction
2223
import java.text.DateFormat
2324
import java.util.{Arrays, Comparator, Date, Locale}
@@ -30,7 +31,7 @@ import scala.util.control.NonFatal
3031

3132
import com.google.common.primitives.Longs
3233
import org.apache.hadoop.conf.Configuration
33-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
34+
import org.apache.hadoop.fs._
3435
import org.apache.hadoop.mapred.JobConf
3536
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3637
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -471,4 +472,33 @@ object SparkHadoopUtil {
471472
hadoopConf.set(key.substring("spark.hadoop.".length), value)
472473
}
473474
}
475+
476+
// scalastyle:off line.size.limit
477+
/**
478+
* Create a path that uses replication instead of erasure coding (ec), regardless of the default
479+
* configuration in hdfs for the given path. This can be helpful as hdfs ec doesn't support
480+
* hflush(), hsync(), or append()
481+
* https://hadoop.apache.org/docs/r3.0.0/hadoop-project-dist/hadoop-hdfs/HDFSErasureCoding.html#Limitations
482+
*/
483+
// scalastyle:on line.size.limit
484+
def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = {
485+
try {
486+
// Use reflection as this uses apis only avialable in hadoop 3
487+
val builderMethod = fs.getClass().getMethod("createFile", classOf[Path])
488+
val builder = builderMethod.invoke(fs, path)
489+
val builderCls = builder.getClass()
490+
// this may throw a NoSuchMethodException if the path is not on hdfs
491+
val replicateMethod = builderCls.getMethod("replicate")
492+
val buildMethod = builderCls.getMethod("build")
493+
val b2 = replicateMethod.invoke(builder)
494+
buildMethod.invoke(b2).asInstanceOf[FSDataOutputStream]
495+
} catch {
496+
case _: NoSuchMethodException =>
497+
// No createFile() method, we're using an older hdfs client, which doesn't give us control
498+
// over EC vs. replication. Older hdfs doesn't have EC anyway, so just create a file with
499+
// old apis.
500+
fs.create(path)
501+
}
502+
}
503+
474504
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ package object config {
5858
.booleanConf
5959
.createWithDefault(false)
6060

61+
private[spark] val EVENT_LOG_ALLOW_EC =
62+
ConfigBuilder("spark.eventLog.allowErasureCoding")
63+
.booleanConf
64+
.createWithDefault(false)
65+
6166
private[spark] val EVENT_LOG_TESTING =
6267
ConfigBuilder("spark.eventLog.testing")
6368
.internal()

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ private[spark] class EventLoggingListener(
6767
private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
6868
private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
6969
private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES)
70+
private val shouldAllowECLogs = sparkConf.get(EVENT_LOG_ALLOW_EC)
7071
private val shouldLogStageExecutorMetrics = sparkConf.get(EVENT_LOG_STAGE_EXECUTOR_METRICS)
7172
private val testing = sparkConf.get(EVENT_LOG_TESTING)
7273
private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
@@ -119,7 +120,11 @@ private[spark] class EventLoggingListener(
119120
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
120121
new FileOutputStream(uri.getPath)
121122
} else {
122-
hadoopDataStream = Some(fileSystem.create(path))
123+
hadoopDataStream = Some(if (shouldAllowECLogs) {
124+
fileSystem.create(path)
125+
} else {
126+
SparkHadoopUtil.createNonECFile(fileSystem, path)
127+
})
123128
hadoopDataStream.get
124129
}
125130

docs/configuration.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,17 @@ Apart from these, the following properties are also available, and may be useful
761761
Compression will use <code>spark.io.compression.codec</code>.
762762
</td>
763763
</tr>
764+
<tr>
765+
<td><code>spark.eventLog.allowErasureCoding</code></td>
766+
<td>false</td>
767+
<td>
768+
Whether to allow event logs to use erasure coding, or turn erasure coding off, regardless of
769+
filesystem defaults. On HDFS, erasure coded files will not update as quickly as regular
770+
replicated files, so the application updates will take longer to appear in the History Server.
771+
Note that even if this is true, Spark will still not force the file to use erasure coding, it
772+
will simply use filesystem defaults.
773+
</td>
774+
</tr>
764775
<tr>
765776
<td><code>spark.eventLog.dir</code></td>
766777
<td>file:///tmp/spark-events</td>

0 commit comments

Comments
 (0)