Skip to content

Commit 8c4cdc2

Browse files
committed
groupwithin new implementation
1 parent 24ea712 commit 8c4cdc2

File tree

1 file changed

+84
-1
lines changed

1 file changed

+84
-1
lines changed

core/shared/src/main/scala/fs2/Stream.scala

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1401,7 +1401,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
14011401
* @param timeout maximum time that input elements are held in the buffer
14021402
* before being emitted by the resulting stream.
14031403
*/
1404-
def groupWithin[F2[x] >: F[x]](
1404+
def groupWithin1[F2[x] >: F[x]](
14051405
chunkSize: Int,
14061406
timeout: FiniteDuration
14071407
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = {
@@ -1505,6 +1505,89 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
15051505
}
15061506
}
15071507

1508+
def groupWithin[F2[x] >: F[x]](
1509+
chunkSize: Int,
1510+
timeout: FiniteDuration
1511+
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = {
1512+
1513+
case class State(os: Chunk[O], supplyEnded: Boolean) {
1514+
1515+
val size: Long = os.size.toLong
1516+
1517+
// checking if it's empty to avoid early termination of the stream if the producer is faster than consumers
1518+
val streamExhausted: Boolean = supplyEnded && os.isEmpty
1519+
1520+
def endSupply: State = copy(supplyEnded = true)
1521+
1522+
def add(o: O): State = copy(os = os ++ Chunk.singleton(o))
1523+
1524+
def splitAt(idx: Long): (State, Chunk[O]) = {
1525+
val (flushed, kept) = os.splitAt(idx.toInt)
1526+
(copy(os = kept), flushed)
1527+
}
1528+
1529+
override def toString =
1530+
s"State(os = $size, os = $os supplyEnded = $supplyEnded, streamExhausted = $streamExhausted)"
1531+
}
1532+
1533+
if (timeout.toNanos == 0) chunkN(chunkSize)
1534+
else
1535+
Stream.force {
1536+
for {
1537+
state <- Ref.of(State(os = Chunk.empty, supplyEnded = false))
1538+
supply <- Semaphore[F2](0)
1539+
} yield {
1540+
1541+
val groupSize = chunkSize.toLong
1542+
1543+
val enqueue: F2[Unit] =
1544+
evalTap(o => state.update(_.add(o)) *> supply.release)
1545+
.covary[F2]
1546+
.compile
1547+
.drain
1548+
1549+
// run at the end when there's no need to wait
1550+
val maxOutSupply = supply.releaseN(Int.MaxValue)
1551+
1552+
val markSupplyEnd = state.update(_.endSupply)
1553+
1554+
val enqueueAsync = F.start(enqueue.guarantee(markSupplyEnd *> maxOutSupply))
1555+
1556+
val waitNext =
1557+
supply.tryAcquireN(groupSize).flatMap(acquired => supply.acquire.whenA(!acquired))
1558+
1559+
// emit any residual element from the buffer (there shouldn't be any)
1560+
val emitLast = state.get.map(Chunk.empty[O] ++ _.os)
1561+
1562+
val byTime = F.sleep(timeout) *> state.get.map(_.size)
1563+
1564+
val bySize = supply.acquireN(groupSize).as(groupSize)
1565+
1566+
def emitChunk(n: Long): F2[Chunk[O]] = state.modify(_.splitAt(n))
1567+
1568+
val dequeue: F2[Chunk[O]] =
1569+
F.race(bySize, byTime).flatMap {
1570+
1571+
case Left(_) => emitChunk(groupSize)
1572+
1573+
case Right(bufferSize) if bufferSize != 0 =>
1574+
val n = bufferSize.min(groupSize)
1575+
1576+
// lower supply by n since n elements are being flushed
1577+
emitChunk(n) <* supply.tryAcquireN(n)
1578+
1579+
case Right(_) =>
1580+
F.ifM(state.get.map(_.streamExhausted))(emitLast, waitNext *> emitChunk(groupSize))
1581+
}
1582+
1583+
Stream.bracket(enqueueAsync)(fib => markSupplyEnd *> fib.cancel) >>
1584+
Stream
1585+
.repeatEval(dequeue)
1586+
.collectWhile { case os if os.nonEmpty => os }
1587+
}
1588+
}
1589+
}
1590+
15081591
/** If `this` terminates with `Stream.raiseError(e)`, invoke `h(e)`.
15091592
*
15101593
* @example {{{

0 commit comments

Comments
 (0)