@@ -40,6 +40,42 @@ class IoPlatformSuite extends Fs2Suite {
4040 // This suite runs for a long time, this avoids timeouts in CI.
4141 override def munitIOTimeout : Duration = 2 .minutes
4242
43+ group(" readInputStream" ) {
44+ test(" reuses internal buffer on smaller chunks" ) {
45+ forAllF { (bytes : Array [Byte ], chunkSize0 : Int ) =>
46+ val chunkSize = (chunkSize0 % 20 ).abs + 1
47+ fs2.Stream
48+ .chunk(Chunk .array(bytes))
49+ .chunkN(chunkSize / 3 + 1 )
50+ .unchunks
51+ .covary[IO ]
52+ // we know that '.toInputStream' reads by chunk
53+ .through(fs2.io.toInputStream)
54+ .flatMap(is => io.readInputStream(IO (is), chunkSize))
55+ .chunks
56+ .zipWithPrevious
57+ .assertForall {
58+ case (None , _) => true // skip first element
59+ case (_, _ : Chunk .Singleton [_]) => true // skip singleton bytes
60+ case (Some (_ : Chunk .Singleton [_]), _) => true // skip singleton bytes
61+ case (Some (Chunk .ArraySlice (bs1, o1, l1)), Chunk .ArraySlice (bs2, o2, _)) =>
62+ {
63+ // if first slice buffer is not 'full'
64+ (bs1.length != (o1 + l1)) &&
65+ // we expect that next slice will wrap same buffer
66+ ((bs2 eq bs1) && (o2 == o1 + l1))
67+ } || {
68+ // if first slice buffer is 'full'
69+ (bs2.length == (o1 + l1)) &&
70+ // we expect new buffer allocated for next slice
71+ ((bs2 ne bs1) && (o2 == 0 ))
72+ }
73+ case _ => false // unexpected chunk subtype
74+ }
75+ }
76+ }
77+ }
78+
4379 group(" readOutputStream" ) {
4480 test(" writes data and terminates when `f` returns" ) {
4581 forAllF { (bytes : Array [Byte ], chunkSize0 : Int ) =>
0 commit comments