Skip to content

Commit df5ede1

Browse files
committed
fix 'fs.io.readInputStreamGeneric' overallocation of underlying buffers
1 parent 9bb4462 commit df5ede1

File tree

2 files changed

+17
-14
lines changed

2 files changed

+17
-14
lines changed

io/jvm/src/main/scala/fs2/io/ioplatform.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ private[fs2] trait ioplatform extends iojvmnative {
4343
F.pure(System.in),
4444
F.delay(new Array[Byte](bufSize)),
4545
false
46-
) { (is, buf) =>
46+
) { (is, buf, off) =>
4747
F.async[Int] { cb =>
4848
F.delay {
49-
val task: Runnable = () => cb(Right(is.read(buf)))
49+
val task: Runnable = () => cb(Right(is.read(buf, off, buf.length - off)))
5050
stdinExecutor.submit(task)
5151
}.map { fut =>
5252
Some(F.delay {

io/shared/src/main/scala/fs2/io/io.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ package object io extends ioplatform {
4545
fis,
4646
F.delay(new Array[Byte](chunkSize)),
4747
closeAfterUse
48-
)((is, buf) => F.blocking(is.read(buf)))
48+
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)))
4949

5050
private[io] def readInputStreamCancelable[F[_]](
5151
fis: F[InputStream],
@@ -57,7 +57,7 @@ package object io extends ioplatform {
5757
fis,
5858
F.delay(new Array[Byte](chunkSize)),
5959
closeAfterUse
60-
)((is, buf) => F.blocking(is.read(buf)).cancelable(cancel))
60+
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)).cancelable(cancel))
6161

6262
/** Reads all bytes from the specified `InputStream` with a buffer size of `chunkSize`.
6363
* Set `closeAfterUse` to false if the `InputStream` should not be closed after use.
@@ -76,28 +76,31 @@ package object io extends ioplatform {
7676
fis,
7777
F.pure(new Array[Byte](chunkSize)),
7878
closeAfterUse
79-
)((is, buf) => F.blocking(is.read(buf)))
79+
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)))
8080

81-
private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte])(
82-
read: (InputStream, Array[Byte]) => F[Int]
81+
private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte], offset: Int)(
82+
read: (InputStream, Array[Byte], Int) => F[Int]
8383
)(implicit
8484
F: Sync[F]
8585
): F[Option[Chunk[Byte]]] =
86-
read(is, buf).map { numBytes =>
87-
if (numBytes < 0) None
88-
else if (numBytes == 0) Some(Chunk.empty)
89-
else if (numBytes < buf.size) Some(Chunk.array(buf, 0, numBytes))
90-
else Some(Chunk.array(buf))
86+
read(is, buf, offset).flatMap { numBytes =>
87+
if (numBytes < 0) {
88+
if (offset == 0) F.pure(None)
89+
else F.pure(Some(Chunk.array(buf, 0, offset)))
90+
} else {
91+
if (offset + numBytes == buf.size) F.pure(Some(Chunk.array(buf)))
92+
else readBytesFromInputStream(is, buf, offset + numBytes)(read)
93+
}
9194
}
9295

9396
private[fs2] def readInputStreamGeneric[F[_]](
9497
fis: F[InputStream],
9598
buf: F[Array[Byte]],
9699
closeAfterUse: Boolean
97-
)(read: (InputStream, Array[Byte]) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = {
100+
)(read: (InputStream, Array[Byte], Int) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = {
98101
def useIs(is: InputStream) =
99102
Stream
100-
.eval(buf.flatMap(b => readBytesFromInputStream(is, b)(read)))
103+
.eval(buf.flatMap(b => readBytesFromInputStream(is, b, 0)(read)))
101104
.repeat
102105
.unNoneTerminate
103106
.flatMap(c => Stream.chunk(c))

0 commit comments

Comments
 (0)