Skip to content

Commit 6b27ad5

Browse files
[SPARK-28102][CORE] Avoid performance problems when lz4-java JNI libraries fail to initialize
## What changes were proposed in this pull request? This PR fixes a performance problem in environments where `lz4-java`'s native JNI libraries fail to initialize. Spark's uses `lz4-java` for LZ4 compression. Under the hood, the `LZ4BlockInputStream` and `LZ4BlockOutputStream` constructors call `LZ4Factory.fastestInstance()`, which attempts to load JNI libraries and falls back on Java implementations in case the JNI library cannot be loaded or initialized. If the LZ4 JNI libraries are present on the library load path (`Native.isLoaded()`) but cannot be initialized (e.g. due to breakage caused by shading) then an exception will be thrown and caught, triggering fallback to `fastestJavaInstance()` (a non-JNI implementation). Unfortunately, the LZ4 library does not cache the fact that the JNI library failed during initialization, so every call to `LZ4Factory.fastestInstance()` re-attempts (and fails) to initialize the native code. These initialization attempts are performed in a `static synchronized` method, so exceptions from failures are thrown while holding shared monitors and this causes monitor-contention performance issues. Here's an example stack trace showing the problem: ```java java.lang.Throwable.fillInStackTrace(Native Method) java.lang.Throwable.fillInStackTrace(Throwable.java:783) => holding Monitor(java.lang.NoClassDefFoundError441628568}) java.lang.Throwable.<init>(Throwable.java:265) java.lang.Error.<init>(Error.java:70) java.lang.LinkageError.<init>(LinkageError.java:55) java.lang.NoClassDefFoundError.<init>(NoClassDefFoundError.java:59) shaded.net.jpountz.lz4.LZ4JNICompressor.compress(LZ4JNICompressor.java:36) shaded.net.jpountz.lz4.LZ4Factory.<init>(LZ4Factory.java:200) shaded.net.jpountz.lz4.LZ4Factory.instance(LZ4Factory.java:51) shaded.net.jpountz.lz4.LZ4Factory.nativeInstance(LZ4Factory.java:84) => holding Monitor(java.lang.Class1475983836}) shaded.net.jpountz.lz4.LZ4Factory.fastestInstance(LZ4Factory.java:157) shaded.net.jpountz.lz4.LZ4BlockOutputStream.<init>(LZ4BlockOutputStream.java:135) org.apache.spark.io.LZ4CompressionCodec.compressedOutputStream(CompressionCodec.scala:122) org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:156) org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:131) org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:120) org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249) org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:211) [...] ``` To avoid this problem, this PR modifies Spark's `LZ4CompressionCodec` to call `fastestInstance()` itself and cache the result (which is safe because these factories [are thread-safe](lz4/lz4-java#82)). ## How was this patch tested? Existing unit tests. Closes apache#24905 from JoshRosen/lz4-factory-flags. Lead-authored-by: Josh Rosen <[email protected]> Co-authored-by: Josh Rosen <[email protected]> Signed-off-by: Josh Rosen <[email protected]>
1 parent fc65e0f commit 6b27ad5

File tree

1 file changed

+25
-3
lines changed

1 file changed

+25
-3
lines changed

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import java.util.Locale
2222

2323
import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
2424
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
25-
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
25+
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
26+
import net.jpountz.xxhash.XXHashFactory
2627
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
2728

2829
import org.apache.spark.SparkConf
@@ -118,14 +119,35 @@ private[spark] object CompressionCodec {
118119
@DeveloperApi
119120
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
120121

122+
// SPARK-28102: if the LZ4 JNI libraries fail to initialize then `fastestInstance()` calls fall
123+
// back to non-JNI implementations but do not remember the fact that JNI failed to load, so
124+
// repeated calls to `fastestInstance()` will cause performance problems because the JNI load
125+
// will be repeatedly re-attempted and that path is slow because it throws exceptions from a
126+
// static synchronized method (causing lock contention). To avoid this problem, we cache the
127+
// result of the `fastestInstance()` calls ourselves (both factories are thread-safe).
128+
@transient private[this] lazy val lz4Factory: LZ4Factory = LZ4Factory.fastestInstance()
129+
@transient private[this] lazy val xxHashFactory: XXHashFactory = XXHashFactory.fastestInstance()
130+
131+
private[this] val defaultSeed: Int = 0x9747b28c // LZ4BlockOutputStream.DEFAULT_SEED
132+
121133
override def compressedOutputStream(s: OutputStream): OutputStream = {
122134
val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt
123-
new LZ4BlockOutputStream(s, blockSize)
135+
val syncFlush = false
136+
new LZ4BlockOutputStream(
137+
s,
138+
blockSize,
139+
lz4Factory.fastCompressor(),
140+
xxHashFactory.newStreamingHash32(defaultSeed).asChecksum,
141+
syncFlush)
124142
}
125143

126144
override def compressedInputStream(s: InputStream): InputStream = {
127145
val disableConcatenationOfByteStream = false
128-
new LZ4BlockInputStream(s, disableConcatenationOfByteStream)
146+
new LZ4BlockInputStream(
147+
s,
148+
lz4Factory.fastDecompressor(),
149+
xxHashFactory.newStreamingHash32(defaultSeed).asChecksum,
150+
disableConcatenationOfByteStream)
129151
}
130152
}
131153

0 commit comments

Comments
 (0)