Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2413,7 +2413,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
Pull.output(acc) >> go(hd, size, tl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is another error. The size should ideally be the desired size of the next output chunk, but that is the one being emited in acc. Instead should be -1, so that the next iteration recomputes a new size based on the first chunk of tl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I realize it now.

else {
val (out, rem) = acc.splitAt(size - 1)
Pull.output(out) >> go(rem ++ hd, -1, tl)
Pull.output(out) >> go(rem, -1, Pull.output(hd) >> tl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Pull.output(hd) >> tl equivalent to s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I should change it to s. :) Thanks for that.

}
case None =>
Pull.output(acc)
Expand Down
7 changes: 7 additions & 0 deletions core/shared/src/test/scala/fs2/StreamSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,13 @@ class StreamSuite extends Fs2Suite {
assertEquals(countChunks(source), 2)
}

test("rechunkRandomly sometimes emits everything in a single chunk") {
val fiveChunks = Stream(1) ++ Stream(2) ++ Stream(3) ++ Stream(4) ++ Stream(5)

val source = fiveChunks.rechunkRandomlyWithSeed(0.1, 2.0)(5).chunks.toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the way factor and nextSize are implemented now, I don't think factor can ever reach maxFactor. So for this example nextSize can only be 0 or 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's true. Now the range is [minFactor, maxFactor).

assert(source.forall(_.size <= 2), "Some Chunks have larger size than maxFactor")
}

group("Stream[F, Either[Throwable, O]]") {
test(".evalMap(_.pure.rethrow).mask <-> .rethrow.mask") {
forAllF { (stream: Stream[Pure, Int]) =>
Expand Down