Skip to content

Commit 47f54b1

Browse files
dongjoon-hyundbtsai
authored andcommitted
[SPARK-28118][CORE] Add spark.eventLog.compression.codec configuration
## What changes were proposed in this pull request? Event logs are different from the other data in terms of the lifetime. It would be great to have a new configuration for Spark event log compression like `spark.eventLog.compression.codec` . This PR adds this new configuration as an optional configuration. So, if `spark.eventLog.compression.codec` is not given, `spark.io.compression.codec` will be used. ## How was this patch tested? Pass the Jenkins with the newly added test case. Closes apache#24921 from dongjoon-hyun/SPARK-28118. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: DB Tsai <[email protected]>
1 parent d98a5ce commit 47f54b1

File tree

4 files changed

+34
-4
lines changed

4 files changed

+34
-4
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,13 @@ package object config {
11801180
.intConf
11811181
.createWithDefault(1)
11821182

1183+
private[spark] val EVENT_LOG_COMPRESSION_CODEC =
1184+
ConfigBuilder("spark.eventLog.compression.codec")
1185+
.doc("The codec used to compress event log. By default, Spark provides four codecs: " +
1186+
"lz4, lzf, snappy, and zstd. You can also use fully qualified class names to specify " +
1187+
"the codec. If this is not given, spark.io.compression.codec will be used.")
1188+
.fallbackConf(IO_COMPRESSION_CODEC)
1189+
11831190
private[spark] val BUFFER_SIZE =
11841191
ConfigBuilder("spark.buffer.size")
11851192
.intConf

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
4444
* spark.eventLog.enabled - Whether event logging is enabled.
4545
* spark.eventLog.logBlockUpdates.enabled - Whether to log block updates
4646
* spark.eventLog.compress - Whether to compress logged events
47+
* spark.eventLog.compression.codec - The codec to compress logged events
4748
* spark.eventLog.overwrite - Whether to overwrite any existing files.
4849
* spark.eventLog.dir - Path to the directory in which events are logged.
4950
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
@@ -73,11 +74,12 @@ private[spark] class EventLoggingListener(
7374
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
7475
private val compressionCodec =
7576
if (shouldCompress) {
76-
Some(CompressionCodec.createCodec(sparkConf))
77+
Some(CompressionCodec.createCodec(sparkConf, sparkConf.get(EVENT_LOG_COMPRESSION_CODEC)))
7778
} else {
7879
None
7980
}
80-
private val compressionCodecName = compressionCodec.map { c =>
81+
// Visible for tests only.
82+
private[scheduler] val compressionCodecName = compressionCodec.map { c =>
8183
CompressionCodec.getShortName(c.getClass.getName)
8284
}
8385

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,20 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
8787
testEventLogging()
8888
}
8989

90+
test("spark.eventLog.compression.codec overrides spark.io.compression.codec") {
91+
val conf = new SparkConf
92+
conf.set(EVENT_LOG_COMPRESS, true)
93+
94+
// The default value is `spark.io.compression.codec`.
95+
val e = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
96+
assert(e.compressionCodecName.contains("lz4"))
97+
98+
// `spark.eventLog.compression.codec` overrides `spark.io.compression.codec`.
99+
conf.set(EVENT_LOG_COMPRESSION_CODEC, "zstd")
100+
val e2 = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
101+
assert(e2.compressionCodecName.contains("zstd"))
102+
}
103+
90104
test("Basic event logging with compression") {
91105
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
92106
testEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec)))
@@ -535,7 +549,7 @@ object EventLoggingListenerSuite {
535549
conf.set(EVENT_LOG_DIR, logDir.toString)
536550
compressionCodec.foreach { codec =>
537551
conf.set(EVENT_LOG_COMPRESS, true)
538-
conf.set(IO_COMPRESSION_CODEC, codec)
552+
conf.set(EVENT_LOG_COMPRESSION_CODEC, codec)
539553
}
540554
conf.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
541555
conf

docs/configuration.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -880,7 +880,14 @@ Apart from these, the following properties are also available, and may be useful
880880
<td>false</td>
881881
<td>
882882
Whether to compress logged events, if <code>spark.eventLog.enabled</code> is true.
883-
Compression will use <code>spark.io.compression.codec</code>.
883+
</td>
884+
</tr>
885+
<tr>
886+
<td><code>spark.eventLog.compression.codec</code></td>
887+
<td></td>
888+
<td>
889+
The codec to compress logged events. If this is not given,
890+
<code>spark.io.compression.codec</code> will be used.
884891
</td>
885892
</tr>
886893
<tr>

0 commit comments

Comments
 (0)