Skip to content

Commit 858b31b

Browse files
committed
timeout handling improvements
1 parent 87bd84f commit 858b31b

File tree

1 file changed

+21
-23
lines changed

1 file changed

+21
-23
lines changed

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,10 +1512,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
15121512

15131513
case class State[+A](os: Chunk[A], supplyEnded: Boolean) {
15141514

1515-
val size: Long = os.size.toLong
1516-
15171515
// checking if it's empty to avoid early termination of the stream if the producer is faster than consumers
1518-
val streamExhausted: Boolean = supplyEnded && os.isEmpty
1516+
def streamExhausted: Boolean = supplyEnded && os.isEmpty
15191517

15201518
def endSupply: State[A] = copy(supplyEnded = true)
15211519

@@ -1525,7 +1523,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
15251523
}
15261524

15271525
override def toString =
1528-
s"State(os = $size, os = $os supplyEnded = $supplyEnded, streamExhausted = $streamExhausted)"
1526+
s"State(size = ${os.size}, supplyEnded = $supplyEnded, streamExhausted = $streamExhausted), os = $os"
15291527
}
15301528

15311529
object State {
@@ -1555,31 +1553,31 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
15551553

15561554
val enqueueAsync = F.start(enqueue.guarantee(markSupplyEnd *> maxOutSupply))
15571555

1558-
val waitNext =
1559-
supply.tryAcquireN(groupSize).flatMap(acquired => supply.acquire.whenA(!acquired))
1560-
1561-
// emit any residual element from the buffer (there shouldn't be any)
1562-
val emitLast = state.get.map(Chunk.empty[O] ++ _.os)
1556+
// returns the number of elements to be flushed on the left &
1557+
// the number of permits the supply should be lowered by on the right
1558+
val awaitNext: F2[(Long, Long)] = {
15631559

1564-
val byTime = F.sleep(timeout) *> state.get.map(_.size)
1560+
val nextChunk = for {
1561+
_ <- supply.acquire
1562+
a <- supply.available.map(_ + 1) // buffer size = acquired (1) + available
1563+
flushSize = a.min(groupSize) // apply cap (producer is still running)
1564+
} yield (flushSize, flushSize - 1) // decrement permits (one already acquired)
15651565

1566-
val bySize = supply.acquireN(groupSize).as(groupSize)
1566+
F.ifM(supply.tryAcquireN(groupSize))(F.pure(groupSize, groupSize), nextChunk)
1567+
}
15671568

15681569
def emitChunk(n: Long): F2[Chunk[O]] = state.modify(_.splitAt(n))
15691570

1570-
val dequeue: F2[Chunk[O]] =
1571-
F.race(bySize, byTime).flatMap {
1572-
1573-
case Left(_) => emitChunk(groupSize)
1574-
1575-
case Right(bufferSize) if bufferSize != 0 =>
1576-
val n = bufferSize.min(groupSize)
1577-
1578-
// lower supply by n since n elements are being flushed
1579-
emitChunk(n) <* supply.tryAcquireN(n)
1571+
val onTimeout =
1572+
F.ifM(state.get.map(_.streamExhausted))(
1573+
F.pure(Chunk.empty[O]), // emit an empty chunk to end the stream
1574+
awaitNext.flatMap { case (n, p) => emitChunk(n) <* supply.tryAcquireN(p) }
1575+
)
15801576

1581-
case Right(_) =>
1582-
F.ifM(state.get.map(_.streamExhausted))(emitLast, waitNext *> emitChunk(groupSize))
1577+
val dequeue: F2[Chunk[O]] =
1578+
F.race(supply.acquireN(groupSize), F.sleep(timeout)).flatMap {
1579+
case Left(_) => emitChunk(groupSize)
1580+
case Right(_) => onTimeout
15831581
}
15841582

15851583
Stream.bracket(enqueueAsync)(fib => markSupplyEnd *> fib.cancel) >>

0 commit comments

Comments
 (0)