Skip to content

Commit b8ffb51

Browse files
Eyal Faragocloud-fan
authored andcommitted
[SPARK-3151][BLOCK MANAGER] DiskStore.getBytes fails for files larger than 2GB
## What changes were proposed in this pull request? introduced `DiskBlockData`, a new implementation of `BlockData` representing a whole file. this is somehow related to [SPARK-6236](https://issues.apache.org/jira/browse/SPARK-6236) as well This class follows the implementation of `EncryptedBlockData` just without the encryption. hence: * `toInputStream` is implemented using a `FileInputStream` (todo: encrypted version actually uses `Channels.newInputStream`, not sure if it's the right choice for this) * `toNetty` is implemented in terms of `io.netty.channel.DefaultFileRegion` * `toByteBuffer` fails for files larger than 2GB (same behavior of the original code, just postponed a bit), it also respects the same configuration keys defined by the original code to choose between memory mapping and simple file read. ## How was this patch tested? added test to DiskStoreSuite and MemoryManagerSuite Author: Eyal Farago <[email protected]> Closes apache#18855 from eyalfa/SPARK-3151.
1 parent a0345cb commit b8ffb51

File tree

2 files changed

+103
-26
lines changed

2 files changed

+103
-26
lines changed

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap
2727
import scala.collection.mutable.ListBuffer
2828

2929
import com.google.common.io.{ByteStreams, Closeables, Files}
30-
import io.netty.channel.FileRegion
30+
import io.netty.channel.{DefaultFileRegion, FileRegion}
3131
import io.netty.util.AbstractReferenceCounted
3232

3333
import org.apache.spark.{SecurityManager, SparkConf}
@@ -47,6 +47,8 @@ private[spark] class DiskStore(
4747
securityManager: SecurityManager) extends Logging {
4848

4949
private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
50+
private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
51+
Int.MaxValue.toString)
5052
private val blockSizes = new ConcurrentHashMap[String, Long]()
5153

5254
def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name)
@@ -108,25 +110,7 @@ private[spark] class DiskStore(
108110
new EncryptedBlockData(file, blockSize, conf, key)
109111

110112
case _ =>
111-
val channel = new FileInputStream(file).getChannel()
112-
if (blockSize < minMemoryMapBytes) {
113-
// For small files, directly read rather than memory map.
114-
Utils.tryWithSafeFinally {
115-
val buf = ByteBuffer.allocate(blockSize.toInt)
116-
JavaUtils.readFully(channel, buf)
117-
buf.flip()
118-
new ByteBufferBlockData(new ChunkedByteBuffer(buf), true)
119-
} {
120-
channel.close()
121-
}
122-
} else {
123-
Utils.tryWithSafeFinally {
124-
new ByteBufferBlockData(
125-
new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)), true)
126-
} {
127-
channel.close()
128-
}
129-
}
113+
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
130114
}
131115
}
132116

@@ -165,6 +149,61 @@ private[spark] class DiskStore(
165149

166150
}
167151

152+
private class DiskBlockData(
153+
minMemoryMapBytes: Long,
154+
maxMemoryMapBytes: Long,
155+
file: File,
156+
blockSize: Long) extends BlockData {
157+
158+
override def toInputStream(): InputStream = new FileInputStream(file)
159+
160+
/**
161+
* Returns a Netty-friendly wrapper for the block's data.
162+
*
163+
* Please see `ManagedBuffer.convertToNetty()` for more details.
164+
*/
165+
override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
166+
167+
override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
168+
Utils.tryWithResource(open()) { channel =>
169+
var remaining = blockSize
170+
val chunks = new ListBuffer[ByteBuffer]()
171+
while (remaining > 0) {
172+
val chunkSize = math.min(remaining, maxMemoryMapBytes)
173+
val chunk = allocator(chunkSize.toInt)
174+
remaining -= chunkSize
175+
JavaUtils.readFully(channel, chunk)
176+
chunk.flip()
177+
chunks += chunk
178+
}
179+
new ChunkedByteBuffer(chunks.toArray)
180+
}
181+
}
182+
183+
override def toByteBuffer(): ByteBuffer = {
184+
require(blockSize < maxMemoryMapBytes,
185+
s"can't create a byte buffer of size $blockSize" +
186+
s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
187+
Utils.tryWithResource(open()) { channel =>
188+
if (blockSize < minMemoryMapBytes) {
189+
// For small files, directly read rather than memory map.
190+
val buf = ByteBuffer.allocate(blockSize.toInt)
191+
JavaUtils.readFully(channel, buf)
192+
buf.flip()
193+
buf
194+
} else {
195+
channel.map(MapMode.READ_ONLY, 0, file.length)
196+
}
197+
}
198+
}
199+
200+
override def size: Long = blockSize
201+
202+
override def dispose(): Unit = {}
203+
204+
private def open() = new FileInputStream(file).getChannel
205+
}
206+
168207
private class EncryptedBlockData(
169208
file: File,
170209
blockSize: Long,

core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,18 @@ class DiskStoreSuite extends SparkFunSuite {
5050
val diskStoreMapped = new DiskStore(conf.clone().set(confKey, "0"), diskBlockManager,
5151
securityManager)
5252
diskStoreMapped.putBytes(blockId, byteBuffer)
53-
val mapped = diskStoreMapped.getBytes(blockId).asInstanceOf[ByteBufferBlockData].buffer
53+
val mapped = diskStoreMapped.getBytes(blockId).toByteBuffer()
5454
assert(diskStoreMapped.remove(blockId))
5555

5656
val diskStoreNotMapped = new DiskStore(conf.clone().set(confKey, "1m"), diskBlockManager,
5757
securityManager)
5858
diskStoreNotMapped.putBytes(blockId, byteBuffer)
59-
val notMapped = diskStoreNotMapped.getBytes(blockId).asInstanceOf[ByteBufferBlockData].buffer
59+
val notMapped = diskStoreNotMapped.getBytes(blockId).toByteBuffer()
6060

6161
// Not possible to do isInstanceOf due to visibility of HeapByteBuffer
62-
assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")),
62+
assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
6363
"Expected HeapByteBuffer for un-mapped read")
64-
assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]),
64+
assert(mapped.isInstanceOf[MappedByteBuffer],
6565
"Expected MappedByteBuffer for mapped read")
6666

6767
def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
@@ -70,8 +70,8 @@ class DiskStoreSuite extends SparkFunSuite {
7070
array
7171
}
7272

73-
assert(Arrays.equals(mapped.toArray, bytes))
74-
assert(Arrays.equals(notMapped.toArray, bytes))
73+
assert(Arrays.equals(new ChunkedByteBuffer(mapped).toArray, bytes))
74+
assert(Arrays.equals(new ChunkedByteBuffer(notMapped).toArray, bytes))
7575
}
7676

7777
test("block size tracking") {
@@ -92,6 +92,44 @@ class DiskStoreSuite extends SparkFunSuite {
9292
assert(diskStore.getSize(blockId) === 0L)
9393
}
9494

95+
test("blocks larger than 2gb") {
96+
val conf = new SparkConf()
97+
.set("spark.storage.memoryMapLimitForTests", "10k" )
98+
val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
99+
val diskStore = new DiskStore(conf, diskBlockManager, new SecurityManager(conf))
100+
101+
val blockId = BlockId("rdd_1_2")
102+
diskStore.put(blockId) { chan =>
103+
val arr = new Array[Byte](1024)
104+
for {
105+
_ <- 0 until 20
106+
} {
107+
val buf = ByteBuffer.wrap(arr)
108+
while (buf.hasRemaining()) {
109+
chan.write(buf)
110+
}
111+
}
112+
}
113+
114+
val blockData = diskStore.getBytes(blockId)
115+
assert(blockData.size == 20 * 1024)
116+
117+
val chunkedByteBuffer = blockData.toChunkedByteBuffer(ByteBuffer.allocate)
118+
val chunks = chunkedByteBuffer.chunks
119+
assert(chunks.size === 2)
120+
for (chunk <- chunks) {
121+
assert(chunk.limit === 10 * 1024)
122+
}
123+
124+
val e = intercept[IllegalArgumentException]{
125+
blockData.toByteBuffer()
126+
}
127+
128+
assert(e.getMessage ===
129+
s"requirement failed: can't create a byte buffer of size ${blockData.size}" +
130+
" since it exceeds 10.0 KB.")
131+
}
132+
95133
test("block data encryption") {
96134
val testDir = Utils.createTempDir()
97135
val testData = new Array[Byte](128 * 1024)

0 commit comments

Comments
 (0)