Skip to content

Commit d0ba384

Browse files
committed
interrupt behaviour change, test improvements
1 parent 1693352 commit d0ba384

File tree

2 files changed

+42
-27
lines changed

2 files changed

+42
-27
lines changed

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,26 +1525,25 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
15251525
buffer.tryTakeN(Some(groupSize.toInt)).map(Chunk.seq)
15261526

15271527
// we need to check the buffer size, rather than the available supply since
1528-
// the supply is maxed out at the end so won't always indicate accurately the buffer size
1528+
// the supply is increased at the end so it won't always report the buffer size accurately
15291529
val isBufferEmpty: F2[Boolean] =
15301530
buffer.size.map(_ == 0)
15311531

15321532
val streamExhausted: F2[Boolean] =
15331533
(isBufferEmpty, supplyEnded.get).mapN(_ && _)
15341534

1535-
// releasing a number of permits equal to {groupSize} should be enough,
1536-
// but in order to ensure termination of the consumer when the producer
1537-
// stream is cancelled we need to max out the supply
1538-
val maxOutSupply: F2[Unit] =
1539-
supply.available.flatMap(a => supply.releaseN(Long.MaxValue - a))
1535+
// releasing a number of permits equal to {groupSize} is enough in most cases, but in
1536+
// order to ensure prompt termination of the consumer on interruption when the timeout
1537+
// has not kicked in yet nor we've seen enough elements we need to max out the supply
1538+
val maxOutSupply =
1539+
supply.available.flatMap(av => supply.releaseN(Long.MaxValue - av))
15401540

15411541
// enabling termination of the consumer stream when the producer completes
15421542
// naturally (i.e runs out of elements)
1543-
val markSupplyEnd: F2[Unit] = supplyEnded.set(true)
1543+
val endSupply: F2[Unit] = supplyEnded.set(true) *> maxOutSupply
15441544

15451545
val enqueue: F2[Unit] =
1546-
foreach(buffer.offer(_) <* supply.release).compile.drain
1547-
.guarantee(markSupplyEnd *> maxOutSupply)
1546+
foreach(buffer.offer(_) <* supply.release).compile.drain.guarantee(endSupply)
15481547

15491548
val awaitAndEmitNext: F2[Chunk[O]] = for {
15501549
isEmpty <- isBufferEmpty

core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ class StreamCombinatorsSuite extends Fs2Suite {
833833
.assertEquals(0.millis)
834834
}
835835

836-
test("Propagation: upstream failures are propagated downstream") {
836+
test("Upstream failures are propagated downstream") {
837837

838838
case object SevenNotAllowed extends NoStackTrace
839839

@@ -847,25 +847,41 @@ class StreamCombinatorsSuite extends Fs2Suite {
847847
downstream.compile.lastOrError.intercept[SevenNotAllowed.type]
848848
}
849849

850-
test("Propagation: upstream cancellation is propagated downstream") {
850+
test("Upstream interruption causes immediate downstream termination with all elements being released") {
851851

852-
def source(counter: Ref[IO, Int]): Stream[IO, Int] = {
853-
Stream
854-
.unfold(0)(s => Some(s, s + 1))
855-
.covary[IO]
856-
.meteredStartImmediately(1.second)
857-
.evalTap(counter.set)
858-
.interruptAfter(5.5.seconds)
859-
}
852+
val sourceTimeout = 5.5.seconds
853+
val downstreamTimeout = sourceTimeout + 2.seconds
860854

861-
def downstream(counter: Ref[IO, Int]): Stream[IO, Chunk[Int]] =
862-
source(counter).groupWithin(Int.MaxValue, 1.day)
863-
864-
(for {
865-
counter <- Ref.of[IO, Int](0)
866-
_ <- downstream(counter).compile.drain
867-
c <- counter.get
868-
} yield c).assertEquals(5)
855+
TestControl
856+
.executeEmbed(
857+
Ref[IO]
858+
.of(0.millis)
859+
.flatMap { ref =>
860+
val source: Stream[IO, Int] =
861+
Stream
862+
.unfold(0)(s => Some(s, s + 1))
863+
.covary[IO]
864+
.meteredStartImmediately(1.second)
865+
.interruptAfter(sourceTimeout)
866+
867+
// large chunkSize and timeout (no emissions expected in the window
868+
// specified, unless source ends, due to interruption or
869+
// natural termination (i.e runs out of elements)
870+
val downstream: Stream[IO, Chunk[Int]] =
871+
source.groupWithin(Int.MaxValue / 2 + 4, 1.day)
872+
873+
downstream.compile.lastOrError
874+
.map(_.toList)
875+
.timeout(downstreamTimeout)
876+
.flatTap(_ => IO.monotonic.flatMap(ref.set))
877+
.flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit)))
878+
}
879+
)
880+
.assertEquals(
881+
// downstream ended immediately (i.e timeLapsed = sourceTimeout)
882+
// emitting whatever was accumulated at the time of interruption
883+
(sourceTimeout, List(0, 1, 2, 3, 4, 5))
884+
)
869885
}
870886
}
871887

0 commit comments

Comments
 (0)