Skip to content

Commit 574ef6c

Browse files
caneGuycloud-fan
authored andcommitted
[SPARK-21527][CORE] Use buffer limit in order to use JAVA NIO Util's buffercache
## What changes were proposed in this pull request? Right now, ChunkedByteBuffer#writeFully do not slice bytes first.We observe code in java nio Util#getTemporaryDirectBuffer below: BufferCache cache = bufferCache.get(); ByteBuffer buf = cache.get(size); if (buf != null) { return buf; } else { // No suitable buffer in the cache so we need to allocate a new // one. To avoid the cache growing then we remove the first // buffer from the cache and free it. if (!cache.isEmpty()) { buf = cache.removeFirst(); free(buf); } return ByteBuffer.allocateDirect(size); } If we slice first with a fixed size, we can use buffer cache and only need to allocate at the first write call. Since we allocate new buffer, we can not control the free time of this buffer.This once cause memory issue in our production cluster. In this patch, i supply a new api which will slice with fixed size for buffer writing. ## How was this patch tested? Unit test and test in production. Author: zhoukang <[email protected]> Author: zhoukang <[email protected]> Closes apache#18730 from caneGuy/zhoukang/improve-chunkwrite.
1 parent 7d16776 commit 574ef6c

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,15 @@ package object config {
293293
.booleanConf
294294
.createWithDefault(false)
295295

296+
private[spark] val BUFFER_WRITE_CHUNK_SIZE =
297+
ConfigBuilder("spark.buffer.write.chunkSize")
298+
.internal()
299+
.doc("The chunk size during writing out the bytes of ChunkedByteBuffer.")
300+
.bytesConf(ByteUnit.BYTE)
301+
.checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" +
302+
" ChunkedByteBuffer should not larger than Int.MaxValue.")
303+
.createWithDefault(64 * 1024 * 1024)
304+
296305
private[spark] val CHECKPOINT_COMPRESS =
297306
ConfigBuilder("spark.checkpoint.compress")
298307
.doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " +

core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import java.nio.channels.WritableByteChannel
2424
import com.google.common.primitives.UnsignedBytes
2525
import io.netty.buffer.{ByteBuf, Unpooled}
2626

27+
import org.apache.spark.SparkEnv
28+
import org.apache.spark.internal.config
2729
import org.apache.spark.network.util.ByteArrayWritableChannel
2830
import org.apache.spark.storage.StorageUtils
2931

@@ -40,6 +42,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
4042
require(chunks != null, "chunks must not be null")
4143
require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
4244

45+
// Chunk size in bytes
46+
private val bufferWriteChunkSize =
47+
Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
48+
.getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt
49+
4350
private[this] var disposed: Boolean = false
4451

4552
/**
@@ -56,7 +63,9 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
5663
*/
5764
def writeFully(channel: WritableByteChannel): Unit = {
5865
for (bytes <- getChunks()) {
59-
while (bytes.remaining > 0) {
66+
while (bytes.remaining() > 0) {
67+
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
68+
bytes.limit(bytes.position + ioSize)
6069
channel.write(bytes)
6170
}
6271
}

0 commit comments

Comments
 (0)