Skip to content

Commit af689fc

Browse files
committed
Make FallbackStorage.read return a truly lazy ManagedBuffer
1 parent ffb1624 commit af689fc

File tree

1 file changed

+53
-13
lines changed

1 file changed

+53
-13
lines changed

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.spark.storage
1919

20-
import java.io.DataInputStream
20+
import java.io.{DataInputStream, InputStream}
2121
import java.nio.ByteBuffer
2222

2323
import scala.concurrent.Future
2424
import scala.reflect.ClassTag
2525

26+
import io.netty.buffer.Unpooled
2627
import org.apache.hadoop.conf.Configuration
2728
import org.apache.hadoop.fs.{FileSystem, Path}
2829

@@ -31,8 +32,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
3132
import org.apache.spark.internal.Logging
3233
import org.apache.spark.internal.LogKeys._
3334
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
34-
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
35-
import org.apache.spark.network.util.JavaUtils
35+
import org.apache.spark.network.buffer.ManagedBuffer
36+
import org.apache.spark.network.util.{JavaUtils, LimitedInputStream}
3637
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
3738
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
3839
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
@@ -114,6 +115,51 @@ private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, hadoopConf
114115
}
115116
}
116117

118+
/**
119+
* Lazily reads a segment of an Hadoop FileSystem file, i.e. when createInputStream is called.
120+
* @param filesystem hadoop filesystem
121+
* @param file path of the file
122+
* @param offset offset of the segment
123+
* @param length size of the segmetn
124+
*/
125+
private[storage] class FileSystemSegmentManagedBuffer(
126+
filesystem: FileSystem,
127+
file: Path,
128+
offset: Long,
129+
length: Long) extends ManagedBuffer with Logging {
130+
131+
override def size(): Long = length
132+
133+
override def nioByteBuffer(): ByteBuffer = {
134+
Utils.tryWithResource(createInputStream()) { in =>
135+
ByteBuffer.wrap(in.readAllBytes())
136+
}
137+
}
138+
139+
override def createInputStream(): InputStream = {
140+
val startTimeNs = System.nanoTime()
141+
try {
142+
val in = filesystem.open(file)
143+
in.seek(offset)
144+
new LimitedInputStream(in, length)
145+
} finally {
146+
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
147+
}
148+
}
149+
150+
override def retain(): ManagedBuffer = this
151+
152+
override def release(): ManagedBuffer = this
153+
154+
override def convertToNetty(): AnyRef = {
155+
Unpooled.wrappedBuffer(nioByteBuffer());
156+
}
157+
158+
override def convertToNettyForSsl(): AnyRef = {
159+
Unpooled.wrappedBuffer(nioByteBuffer());
160+
}
161+
}
162+
117163
private[spark] object FallbackStorage extends Logging {
118164
/** We use one block manager id as a place holder. */
119165
val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
@@ -168,7 +214,9 @@ private[spark] object FallbackStorage extends Logging {
168214
}
169215

170216
/**
171-
* Read a ManagedBuffer.
217+
* Read a block as ManagedBuffer. This reads the index for offset and block size
218+
* but does not read the actual block data. Those data are later read when calling
219+
* createInputStream() on the returned ManagedBuffer.
172220
*/
173221
def read(conf: SparkConf, blockId: BlockId): ManagedBuffer = {
174222
logInfo(log"Read ${MDC(BLOCK_ID, blockId)}")
@@ -202,15 +250,7 @@ private[spark] object FallbackStorage extends Logging {
202250
val hash = JavaUtils.nonNegativeHash(name)
203251
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
204252
val size = nextOffset - offset
205-
logDebug(s"To byte array $size")
206-
val array = new Array[Byte](size.toInt)
207-
val startTimeNs = System.nanoTime()
208-
Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
209-
f.seek(offset)
210-
f.readFully(array)
211-
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
212-
}
213-
new NioManagedBuffer(ByteBuffer.wrap(array))
253+
new FileSystemSegmentManagedBuffer(fallbackFileSystem, dataFile, offset, size)
214254
}
215255
}
216256
}

0 commit comments

Comments
 (0)