Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 52 additions & 91 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1406,103 +1406,64 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
timeout: FiniteDuration
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = {

case class JunctionBuffer[T](
data: Vector[T],
endOfSupply: Option[Either[Throwable, Unit]],
endOfDemand: Option[Either[Throwable, Unit]]
) {
def splitAt(n: Int): (JunctionBuffer[T], JunctionBuffer[T]) =
if (this.data.size >= n) {
val (head, tail) = this.data.splitAt(n.toInt)
(this.copy(tail), this.copy(head))
} else {
(this.copy(Vector.empty), this)
}
}
val groupSize: Long = chunkSize.toLong

val outputLong = chunkSize.toLong
fs2.Stream.force {
for {
demand <- Semaphore[F2](outputLong)
supply <- Semaphore[F2](0L)
buffer <- Ref[F2].of(
JunctionBuffer[O](Vector.empty[O], endOfSupply = None, endOfDemand = None)
)
} yield {
/* - Buffer: stores items from input to be sent on next output chunk
* - Demand Semaphore: to avoid adding too many items to buffer
* - Supply: counts filled positions for next output chunk */
def enqueue(t: O): F2[Boolean] =
for {
_ <- demand.acquire
buf <- buffer.modify(buf => (buf.copy(buf.data :+ t), buf))
_ <- supply.release
} yield buf.endOfDemand.isEmpty

val dequeueNextOutput: F2[Option[Vector[O]]] = {
// Trigger: waits until the supply buffer is full (with acquireN)
val waitSupply = supply.acquireN(outputLong).guaranteeCase {
case Outcome.Succeeded(_) => supply.releaseN(outputLong)
case _ => F.unit
}
if (timeout.toNanos == 0) chunkN(chunkSize)
else
Stream.force {
for {
supply <- Semaphore[F2](0)
supplyEnded <- Ref.of[F2, Boolean](false)
buffer <- Queue.bounded[F2, O](chunkSize) // buffering and backpressure
} yield {

val onTimeout: F2[Long] =
for {
_ <- supply.acquire // waits until there is at least one element in buffer
m <- supply.available
k = m.min(outputLong - 1)
b <- supply.tryAcquireN(k)
} yield if (b) k + 1 else 1

// in JS cancellation doesn't always seem to run, so race conditions should restore state on their own
for {
acq <- F.race(F.sleep(timeout), waitSupply).flatMap {
case Left(_) => onTimeout
case Right(_) => supply.acquireN(outputLong).as(outputLong)
}
buf <- buffer.modify(_.splitAt(acq.toInt))
_ <- demand.releaseN(buf.data.size.toLong)
res <- buf.endOfSupply match {
case Some(Left(error)) => F.raiseError(error)
case Some(Right(_)) if buf.data.isEmpty => F.pure(None)
case _ => F.pure(Some(buf.data))
val emitChunk: F2[Chunk[O]] =
buffer.tryTakeN(Some(groupSize.toInt)).map(Chunk.seq)

// we need to check the buffer size, rather than the available supply since
// the supply is increased at the end so it won't always report the buffer size accurately
val isBufferEmpty: F2[Boolean] =
buffer.size.map(_ == 0)

val streamExhausted: F2[Boolean] =
(isBufferEmpty, supplyEnded.get).mapN(_ && _)

// releasing a number of permits equal to {groupSize} is enough in most cases, but in
// order to ensure prompt termination of the consumer on interruption even when the timeout
// has not kicked in yet nor we've seen enough elements we need to max out the supply
val maxOutSupply: F2[Unit] =
supply.available.flatMap(av => supply.releaseN(Long.MaxValue - av))

// enabling termination of the consumer stream when the producer completes naturally
// (i.e runs out of elements) or when the combined stream (consumer + producer) is interrupted
val endSupply: F2[Unit] = supplyEnded.set(true) *> maxOutSupply

val enqueue: F2[Unit] =
foreach(buffer.offer(_) <* supply.release).compile.drain.guarantee(endSupply)

val awaitAndEmitNext: F2[Chunk[O]] = for {
isEmpty <- isBufferEmpty
awaited <- supply.acquire.whenA(isEmpty).as(if (isEmpty) 1 else 0)
flushed <- emitChunk
// lower supply by {flushed.size} (excluding element already awaited)
_ <- supply.acquireN((flushed.size.toLong - awaited).max(0))
} yield flushed

val onTimeout: F2[Chunk[O]] =
F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), awaitAndEmitNext)

val dequeue: F2[Chunk[O]] =
F.race(supply.acquireN(groupSize), F.sleep(timeout)).flatMap {
Copy link
Member

@armanbilge armanbilge Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve read the issue attached above (race condition), am I right to think that this implementation is potentially better if the race is left-biased, since the timeout logic (on the right) is slightly more convoluted ?

Less chance of deadlock is still not as good as no chance of deadlock 😁

I think the current implementation of groupWithin uses a hack, where it always returns the semaphore, and then attempts to re-acquire it. But there's no way to fix this properly without implementing the Ior-based race as described in that issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was struggling a bit to understand the problem (i.e where the deadlock would occur) until I clicked through a few links from the issue linked above and stumbled across your PR comment here

If I understood your concern correctly the issue is that the semaphore might lose the race, even if it acquired the permits successfully. So in that case we would need to restore them, in order to prevent a deadlock when the onTimeout logic tries to acquire a permit again (that simply won't be there if the semaphore acquired them without releasing them)

if that is the case I believe this is not a problem in this implementation because of this logic:

     val isBufferEmpty: F2[Boolean] =
        buffer.size.map(_ == 0)

      val awaitAndEmitNext: F2[Chunk[O]] = for {
        isEmpty <- isBufferEmpty
        awaited <- supply.acquire.whenA(isEmpty).as(if (isEmpty) 1 else 0)
        flushed <- emitChunk
        // lower supply by {flushed.size} (excluding element already awaited)
        _ <- supply.tryAcquireN((flushed.size.toLong - awaited).max(0))
      } yield flushed

basically if the buffer contains any element by the time the timeout expires we won't attempt to acquire the permit in a blocking fashion in the for-comprehension.
So by definition the deadlock cannot occur or in other words we should be able to rule out the deadlock because if the buffer is empty then we will know for a fact that the competing semaphore (the one in the race) hasn't acquired any permit (i.e. it does not need to release them)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking more closely at this! Yes, you've definitely understood the issue.

After looking at your code and thinking about it, I think you are right that it won't deadlock. However, I still suspect that the state can get corrupted. Something like this:

  1. The race times-out, although the semaphore acquireN also succeeds.
    F.race(supply.acquireN(groupSize), F.sleep(timeout)).flatMap {
    case Left(_) => emitChunk
    case Right(_) => onTimeout
  2. We successfully emitChunk via onTimeout and awaitAndEmitNext.
    val onTimeout =
    F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), awaitAndEmitNext)

    val awaitAndEmitNext: F2[Chunk[O]] = for {
    isEmpty <- isBufferEmpty
    awaited <- supply.acquire.whenA(isEmpty).as(if (isEmpty) 1 else 0)
    flushed <- emitChunk

    val emitChunk: F2[Chunk[O]] =
    buffer.tryTakeN(Some(groupSize.toInt)).map(Chunk.seq)
  3. This unblocks the buffer, which concurrently gets filled with new stuff and releases permits.
    val enqueue: F2[Unit] =
    foreach(buffer.offer(_) <* supply.release).compile.drain.guarantee(endSupply)
  4. The tryAcquireN in awaitAndEmitNext succeeds.
    // lower supply by {flushed.size} (excluding element already awaited)
    _ <- supply.tryAcquireN((flushed.size.toLong - awaited).max(0))
    } yield flushed
  5. At this point, we've consumed 2*chunkSize permits to emit chunkSize elements.

Copy link
Contributor Author

@Angel-O Angel-O Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for breaking this down 🙌🏾

Yes I can see the problem now if the semaphore loses the race, but manages to acquire the permits. We would have a corrupted state and could potentially deadlock/slow down unnecessarily on the next race.

To be honest I didn't know this was a possibility (losing the race but securing the permits), but I think that it could be fixed as follows:

 val onTimeout = {
    val edgeCase = (supply.available.map(_ == 0), buffer.size.map(_ == groupSize)).mapN(_ && _)
    val onExpiry = F.ifM(edgeCase)(supply.releaseN(groupSize) *> awaitAndEmitNext, awaitAndEmitNext)
    F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), onExpiry)
}

the edgeCase val effectively describes the problem you have brought up: if, by the time the timeout expires there are no permits available but the buffer is full then we will know for sure that the semaphore has lost the race but succeeded in acquiring the permits. The fact that the queue is bounded means nothing can interfere at this point and change the values of the available supply or the buffer size (correct me if I'm wrong).

And it's probably an improvement compared to the hack in the current implementation that resets the count regardless just to reacquire the permits shortly after.

val waitSupply = supply.acquireN(outputLong).guaranteeCase {
  case Outcome.Succeeded(_) => supply.releaseN(outputLong)
  case _                    => F.unit
}

Out of curiosity I've run the benchmarks with the above modification adding a println to see how often this would occur. It did not happen once!! So maybe this is very rare! And perhaps the race being left-biased is also a mitigating factor

What do you think ? shall I handle the scenario this way ? Or should I have a go and use the bothOutcome/racePair combinators mentioned in the issue above?

Also what command should I run to run scalafmt on all sub-projects ? CI keeps failing due to formatting 😓.
I've run sbt scalafmt but that didn't work 🤔
done 🤞🏾

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity I've run the benchmarks with the above modification adding a println to see how often this would occur

@armanbilge I have a possible explanation as to why this wasn't happening: there's a bug in the timeout logic that causes the permits count to be inaccurate

val enqueue = foreach(buffer.offer(_) <* supply.release) ...

Since the above is not atomic

val awaitAndEmitNext: F2[Chunk[O]] = for {
    isEmpty <- isBufferEmpty
    awaited <- supply.acquire.whenA(isEmpty).as(if (isEmpty) 1 else 0)
    flushed <- emitChunk
    // lower supply by {flushed.size} (excluding element already awaited)
    _ <- supply.acquireN((flushed.size.toLong - awaited).max(0))
  } yield flushed

Then even if the buffer contains one element, the permit count can be zero. This means that the number of permits acquired (awaited) can be off by one when we acquire them at the end of the for comprehension. As a result the next race will be skewed towards the semaphore.

That and the fact that none of the tests have been able to reproduce the scenario.

I've fixed this in the other PR (will push the change sometime today) and I can see the edge case occurring, or at least I've got a test case where this occurs regularly.

The good thing is that the fix suggested seems to be working and can be even simplified to

 val onTimeout = {
    val edgeCase = (supply.available.map(_ == 0), buffer.size.map(_ == groupSize)).mapN(_ && _)
    // no need to release permits and await a chunk, we know permits have already been acquired
   // and we know the buffer is full so we can emit immediately
    val onExpiry = F.ifM(edgeCase)(emitNext, awaitAndEmitNext) 
    F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), onExpiry)
}

So far tests look good, will update the other PR, later on today. Closing this for now, since the other one seems to more accurate.

Copy link
Member

@armanbilge armanbilge Mar 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fact that none of the tests have been able to reproduce the scenario.

@Angel-O to reproduce this, try using executeEmbed and setting up a scenario where the semaphore gets permits at the exact same time that the timeout expires.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've fixed this in the other PR (will push the change sometime today) and I can see the edge case occurring, or at least I've got a test case where this occurs regularly.

Oh missed this at first, nice work!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!! Using TestControl.executeEmbed is a great idea for this! Will give it a go 👍🏾

case Left(_) => emitChunk
case Right(_) => onTimeout
}
} yield res
}

def endSupply(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue)

def endDemand(result: Either[Throwable, Unit]): F2[Unit] =
buffer.update(_.copy(endOfDemand = Some(result))) *> demand.releaseN(Int.MaxValue)

def toEnding(ec: ExitCase): Either[Throwable, Unit] = ec match {
case ExitCase.Succeeded => Right(())
case ExitCase.Errored(e) => Left(e)
case ExitCase.Canceled => Right(())
}

val enqueueAsync = F.start {
this
.evalMap(enqueue)
.forall(identity)
.onFinalizeCase(ec => endSupply(toEnding(ec)))
.compile
.drain
}

val outputStream: Stream[F2, Chunk[O]] =
Stream
.eval(dequeueNextOutput)
.repeat
.collectWhile { case Some(data) => Chunk.vector(data) }

Stream
.bracketCase(enqueueAsync) { case (upstream, exitCase) =>
endDemand(toEnding(exitCase)) *> upstream.cancel
} >> outputStream
.repeatEval(dequeue)
.collectWhile { case os if os.nonEmpty => os }
.concurrently(Stream.eval(enqueue))
}
}
}
}

/** If `this` terminates with `Stream.raiseError(e)`, invoke `h(e)`.
Expand Down
54 changes: 54 additions & 0 deletions core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.scalacheck.Prop.forAll

import scala.concurrent.duration._
import scala.concurrent.TimeoutException
import scala.util.control.NoStackTrace

class StreamCombinatorsSuite extends Fs2Suite {

Expand Down Expand Up @@ -831,6 +832,59 @@ class StreamCombinatorsSuite extends Fs2Suite {
)
.assertEquals(0.millis)
}

test("upstream failures are propagated downstream") {

case object SevenNotAllowed extends NoStackTrace

val source = Stream
.unfold(0)(s => Some((s, s + 1)))
.covary[IO]
.evalMap(n => if (n == 7) IO.raiseError(SevenNotAllowed) else IO.pure(n))

val downstream = source.groupWithin(100, 2.seconds)

downstream.compile.lastOrError.intercept[SevenNotAllowed.type]
}

test(
"upstream interruption causes immediate downstream termination with all elements being emitted"
) {

val sourceTimeout = 5.5.seconds
val downstreamTimeout = sourceTimeout + 2.seconds

TestControl
.executeEmbed(
Ref[IO]
.of(0.millis)
.flatMap { ref =>
val source: Stream[IO, Int] =
Stream
.unfold(0)(s => Some((s, s + 1)))
.covary[IO]
.meteredStartImmediately(1.second)
.interruptAfter(sourceTimeout)

// large chunkSize and timeout (no emissions expected in the window
// specified, unless source ends, due to interruption or
// natural termination (i.e runs out of elements)
val downstream: Stream[IO, Chunk[Int]] =
source.groupWithin(Int.MaxValue, 1.day)

downstream.compile.lastOrError
.map(_.toList)
.timeout(downstreamTimeout)
.flatTap(_ => IO.monotonic.flatMap(ref.set))
.flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit)))
}
)
.assertEquals(
// downstream ended immediately (i.e timeLapsed = sourceTimeout)
// emitting whatever was accumulated at the time of interruption
(sourceTimeout, List(0, 1, 2, 3, 4, 5))
)
}
}

property("head")(forAll((s: Stream[Pure, Int]) => assertEquals(s.head.toList, s.toList.take(1))))
Expand Down