Skip to content

Commit 71cb649

Browse files
committed
covariant state
1 parent aab5abc commit 71cb649

File tree

1 file changed

+5
-9
lines changed

1 file changed

+5
-9
lines changed

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ import fs2.concurrent._
3737
import fs2.internal._
3838
import Pull.StreamPullOps
3939

40-
import scala.annotation.unchecked.uncheckedVariance
41-
4240
/** A stream producing output of type `O` and which may evaluate `F` effects.
4341
*
4442
* - '''Purely functional''' a value of type `Stream[F, O]` _describes_ an effectful computation.
@@ -1512,18 +1510,16 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
15121510
timeout: FiniteDuration
15131511
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = {
15141512

1515-
case class State(os: Chunk[O @uncheckedVariance], supplyEnded: Boolean) {
1513+
case class State[+A](os: Chunk[A], supplyEnded: Boolean) {
15161514

15171515
val size: Long = os.size.toLong
15181516

15191517
// checking if it's empty to avoid early termination of the stream if the producer is faster than consumers
15201518
val streamExhausted: Boolean = supplyEnded && os.isEmpty
15211519

1522-
def endSupply: State = copy(supplyEnded = true)
1523-
1524-
def add(o: O): State = copy(os = os ++ Chunk.singleton(o))
1520+
def endSupply: State[A] = copy(supplyEnded = true)
15251521

1526-
def splitAt(idx: Long): (State, Chunk[O]) = {
1522+
def splitAt(idx: Long): (State[A], Chunk[A]) = {
15271523
val (flushed, kept) = os.splitAt(idx.toInt)
15281524
(copy(os = kept), flushed)
15291525
}
@@ -1536,14 +1532,14 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
15361532
else
15371533
Stream.force {
15381534
for {
1539-
state <- Ref.of(State(os = Chunk.empty, supplyEnded = false))
1535+
state <- Ref.of(State[O](os = Chunk.empty, supplyEnded = false))
15401536
supply <- Semaphore[F2](0)
15411537
} yield {
15421538

15431539
val groupSize = chunkSize.toLong
15441540

15451541
val enqueue: F2[Unit] =
1546-
evalTap(o => state.update(_.add(o)) *> supply.release)
1542+
evalTap(o => state.update(s => s.copy(os = s.os ++ Chunk.singleton(o))) *> supply.release)
15471543
.covary[F2]
15481544
.compile
15491545
.drain

0 commit comments

Comments
 (0)