@@ -22,10 +22,11 @@ import java.nio.file.Files
2222import scala .concurrent .duration ._
2323import scala .util .Random
2424
25+ import io .netty .buffer .ByteBuf
2526import org .apache .hadoop .conf .Configuration
26- import org .apache .hadoop .fs .{FSDataInputStream , LocalFileSystem , Path , PositionedReadable , Seekable }
27+ import org .apache .hadoop .fs .{FileSystem , FSDataInputStream , LocalFileSystem , Path , PositionedReadable , Seekable }
2728import org .mockito .{ArgumentMatchers => mc }
28- import org .mockito .Mockito .{mock , never , verify , when }
29+ import org .mockito .Mockito .{mock , never , spy , times , verify , when }
2930import org .scalatest .concurrent .Eventually .{eventually , interval , timeout }
3031
3132import org .apache .spark .{LocalSparkContext , SparkConf , SparkContext , SparkFunSuite , TestUtils }
@@ -155,9 +156,49 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
155156 assert(fallbackStorage.exists(1 , ShuffleDataBlockId (1 , 2L , NOOP_REDUCE_ID ).name))
156157
157158 val readResult = FallbackStorage .read(conf, ShuffleBlockId (1 , 2L , 0 ))
159+ assert(readResult.isInstanceOf [FallbackStorageManagedBuffer ])
158160 assert(readResult.nioByteBuffer().array().sameElements(content))
159161 }
160162
163+ test(" SPARK-55469: FallbackStorageManagedBuffer reads block data lazily" ) {
164+ withTempDir { dir =>
165+ val fs = FileSystem .getLocal(new Configuration ())
166+ val file = new Path (dir.getAbsolutePath, " file" )
167+ val data = Array [Byte ](1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 )
168+ tryWithResource(fs.create(file)) { os => os.write(data) }
169+
170+ Seq ((0 , 4 ), (1 , 2 ), (4 , 4 ), (7 , 2 ), (8 , 0 )).foreach { case (offset, length) =>
171+ val clue = s " offset: $offset, length: $length"
172+
173+ // creating the managed buffer does not open the file
174+ val mfs = spy(fs)
175+ val buf = new FallbackStorageManagedBuffer (mfs, file, offset, length)
176+ verify(mfs, never()).open(mc.any[Path ]())
177+ assert(buf.size() === length, clue)
178+
179+ // creating the input stream opens the file
180+ {
181+ val bytes = buf.createInputStream().readAllBytes()
182+ verify(mfs, times(1 )).open(mc.any[Path ]())
183+ assert(bytes.mkString(" ," ) === data.slice(offset, offset + length).mkString(" ," ), clue)
184+ }
185+
186+ // getting a NIO ByteBuffer opens the file again
187+ {
188+ val bytes = buf.nioByteBuffer().array()
189+ verify(mfs, times(2 )).open(mc.any[Path ]())
190+ assert(bytes.mkString(" ," ) === data.slice(offset, offset + length).mkString(" ," ), clue)
191+ }
192+
193+ // getting a NIO ByteBuffer opens the file again
194+ assert(buf.convertToNetty().asInstanceOf [ByteBuf ].release() === length > 0 , clue)
195+ verify(mfs, times(3 )).open(mc.any[Path ]())
196+ assert(buf.convertToNettyForSsl().asInstanceOf [ByteBuf ].release() === length > 0 , clue)
197+ verify(mfs, times(4 )).open(mc.any[Path ]())
198+ }
199+ }
200+ }
201+
161202 test(" SPARK-34142: fallback storage API - cleanUp app" ) {
162203 withTempDir { dir =>
163204 Seq (true , false ).foreach { cleanUp =>
@@ -372,6 +413,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
372413 }
373414 }
374415}
416+
375417class ReadPartialInputStream (val in : FSDataInputStream ) extends InputStream
376418 with Seekable with PositionedReadable {
377419 override def read : Int = in.read
0 commit comments