Skip to content

Commit f35fb99

Browse files
committed
fixup! Make FallbackStorage.read return a truly lazy ManagedBuffer
1 parent 4d25f7a commit f35fb99

File tree

1 file changed

+2
-3
lines changed

1 file changed

+2
-3
lines changed

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
2323
import scala.concurrent.Future
2424
import scala.reflect.ClassTag
2525

26-
import com.google.common.io.ByteStreams
2726
import io.netty.buffer.Unpooled
2827
import org.apache.hadoop.conf.Configuration
2928
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -34,7 +33,7 @@ import org.apache.spark.internal.Logging
3433
import org.apache.spark.internal.LogKeys._
3534
import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH}
3635
import org.apache.spark.network.buffer.ManagedBuffer
37-
import org.apache.spark.network.util.JavaUtils
36+
import org.apache.spark.network.util.{JavaUtils, LimitedInputStream}
3837
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
3938
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
4039
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
@@ -142,7 +141,7 @@ private[storage] class FallbackStorageManagedBuffer (
142141
try {
143142
val in = fallbackFileSystem.open(dataFile)
144143
in.seek(offset)
145-
ByteStreams.limit(in, blockSize)
144+
new LimitedInputStream(in, blockSize)
146145
} finally {
147146
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
148147
}

0 commit comments

Comments
 (0)