Skip to content

Commit d676edd

Browse files
committed
minor improvements and scalafmt
1 parent e417d17 commit d676edd

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,13 +1429,13 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
14291429
(isBufferEmpty, supplyEnded.get).mapN(_ && _)
14301430

14311431
// releasing a number of permits equal to {groupSize} is enough in most cases, but in
1432-
// order to ensure prompt termination of the consumer on interruption when the timeout
1432+
// order to ensure prompt termination of the consumer on interruption even when the timeout
14331433
// has not kicked in yet nor we've seen enough elements we need to max out the supply
14341434
val maxOutSupply =
14351435
supply.available.flatMap(av => supply.releaseN(Long.MaxValue - av))
14361436

1437-
// enabling termination of the consumer stream when the producer completes
1438-
// naturally (i.e runs out of elements)
1437+
// enabling termination of the consumer stream when the producer completes naturally
1438+
// (i.e runs out of elements) or when the combined stream (consumer + producer) is interrupted
14391439
val endSupply: F2[Unit] = supplyEnded.set(true) *> maxOutSupply
14401440

14411441
val enqueue: F2[Unit] =

core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ class StreamCombinatorsSuite extends Fs2Suite {
833833
.assertEquals(0.millis)
834834
}
835835

836-
test("Upstream failures are propagated downstream") {
836+
test("upstream failures are propagated downstream") {
837837

838838
case object SevenNotAllowed extends NoStackTrace
839839

@@ -847,7 +847,7 @@ class StreamCombinatorsSuite extends Fs2Suite {
847847
downstream.compile.lastOrError.intercept[SevenNotAllowed.type]
848848
}
849849

850-
test("Upstream interruption causes immediate downstream termination with all elements being released") {
850+
test("upstream interruption causes immediate downstream termination with all elements being emitted") {
851851

852852
val sourceTimeout = 5.5.seconds
853853
val downstreamTimeout = sourceTimeout + 2.seconds
@@ -868,7 +868,7 @@ class StreamCombinatorsSuite extends Fs2Suite {
868868
// specified, unless source ends, due to interruption or
869869
// natural termination (i.e runs out of elements)
870870
val downstream: Stream[IO, Chunk[Int]] =
871-
source.groupWithin(Int.MaxValue / 2 + 4, 1.day)
871+
source.groupWithin(Int.MaxValue, 1.day)
872872

873873
downstream.compile.lastOrError
874874
.map(_.toList)

0 commit comments

Comments
 (0)