Skip to content

Commit 7ea594e

Browse files
squitoMarcelo Vanzin
authored andcommitted
[SPARK-25827][CORE] Avoid converting incoming encrypted blocks to byte buffers
## What changes were proposed in this pull request? Avoid converting encrypted bocks to regular ByteBuffers, to ensure they can be sent over the network for replication & remote reads even when > 2GB. Also updates some TODOs with links to a SPARK-25905 for improving the handling here. ## How was this patch tested? Tested on a cluster with encrypted data > 2GB (after SPARK-25904 was applied as well). Closes apache#22917 from squito/real_SPARK-25827. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent c71db43 commit 7ea594e

File tree

4 files changed

+11
-6
lines changed

4 files changed

+11
-6
lines changed

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
2929
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ShuffleClient}
30-
import org.apache.spark.storage.{BlockId, StorageLevel}
30+
import org.apache.spark.storage.{BlockId, EncryptedManagedBuffer, StorageLevel}
3131
import org.apache.spark.util.ThreadUtils
3232

3333
private[spark]
@@ -104,6 +104,8 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
104104
data match {
105105
case f: FileSegmentManagedBuffer =>
106106
result.success(f)
107+
case e: EncryptedManagedBuffer =>
108+
result.success(e)
107109
case _ =>
108110
val ret = ByteBuffer.allocate(data.size.toInt)
109111
ret.put(data.nioByteBuffer())

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@ private[spark] class BlockManager(
721721
* Get block from remote block managers as serialized bytes.
722722
*/
723723
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
724-
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
724+
// TODO SPARK-25905 if we change this method to return the ManagedBuffer, then getRemoteValues
725725
// could just use the inputStream on the temp file, rather than reading the file into memory.
726726
// Until then, replication can cause the process to use too much memory and get killed
727727
// even though we've read the data to disk.

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ private class DiskBlockData(
201201
private def open() = new FileInputStream(file).getChannel
202202
}
203203

204-
private class EncryptedBlockData(
204+
private[spark] class EncryptedBlockData(
205205
file: File,
206206
blockSize: Long,
207207
conf: SparkConf,
@@ -263,7 +263,8 @@ private class EncryptedBlockData(
263263
}
264264
}
265265

266-
private class EncryptedManagedBuffer(val blockData: EncryptedBlockData) extends ManagedBuffer {
266+
private[spark] class EncryptedManagedBuffer(
267+
val blockData: EncryptedBlockData) extends ManagedBuffer {
267268

268269
// This is the size of the decrypted data
269270
override def size(): Long = blockData.size

core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.SparkEnv
2929
import org.apache.spark.internal.config
3030
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
3131
import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream}
32-
import org.apache.spark.storage.StorageUtils
32+
import org.apache.spark.storage.{EncryptedManagedBuffer, StorageUtils}
3333
import org.apache.spark.unsafe.array.ByteArrayMethods
3434
import org.apache.spark.util.Utils
3535

@@ -173,11 +173,13 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
173173
private[spark] object ChunkedByteBuffer {
174174

175175

176-
// TODO eliminate this method if we switch BlockManager to getting InputStreams
176+
// TODO SPARK-25905 eliminate this method if we switch BlockManager to getting InputStreams
177177
def fromManagedBuffer(data: ManagedBuffer): ChunkedByteBuffer = {
178178
data match {
179179
case f: FileSegmentManagedBuffer =>
180180
fromFile(f.getFile, f.getOffset, f.getLength)
181+
case e: EncryptedManagedBuffer =>
182+
e.blockData.toChunkedByteBuffer(ByteBuffer.allocate _)
181183
case other =>
182184
new ChunkedByteBuffer(other.nioByteBuffer())
183185
}

0 commit comments

Comments
 (0)