@@ -24,7 +24,7 @@ package grpc
2424package client
2525
2626import cats .implicits ._
27- import cats .effect .Concurrent
27+ import cats .effect .{ Concurrent , Ref }
2828import cats .effect .std .Queue
2929
3030private [grpc] trait StreamIngest [F [_], T ] {
@@ -39,41 +39,63 @@ private[grpc] object StreamIngest {
3939 request : Int => F [Unit ],
4040 prefetchN : Int
4141 ): F [StreamIngest [F , T ]] =
42- Queue
43- .unbounded[F , Either [Option [Throwable ], T ]]
44- .map(q => create[F , T ](request, prefetchN, q))
42+ (Ref [F ].of(0 ), Queue .unbounded[F , Either [Option [Throwable ], T ]])
43+ .mapN((r, q) => create[F , T ](request, prefetchN, r, q))
4544
4645 def create [F [_], T ](
4746 request : Int => F [Unit ],
4847 prefetchN : Int ,
48+ requested : Ref [F , Int ],
4949 queue : Queue [F , Either [Option [Throwable ], T ]]
5050 )(implicit F : Concurrent [F ]): StreamIngest [F , T ] = new StreamIngest [F , T ] {
51-
52- val limit : Int =
53- math.max(1 , prefetchN)
54-
55- val ensureMessages : F [Unit ] =
56- queue.size.flatMap(qs => request(1 ).whenA(qs < limit))
51+ private val limit : Int = math.max(1 , prefetchN)
52+ private def updateRequests : F [Unit ] = {
53+ queue.size.flatMap { queued =>
54+ requested.flatModify { requested =>
55+ val total = queued + requested
56+ val additional = math.max(0 , limit - total)
57+
58+ (
59+ requested + additional,
60+ request(additional).whenA(additional > 0 )
61+ )
62+ }
63+ }
64+ }
5765
5866 def onMessage (msg : T ): F [Unit ] =
59- queue.offer(msg.asRight) *> ensureMessages
67+ queue.offer(msg.asRight) *> requested.update(r => math.max( 0 , r - 1 ))
6068
6169 def onClose (error : Option [Throwable ]): F [Unit ] =
6270 queue.offer(error.asLeft)
6371
6472 val messages : Stream [F , T ] = {
65-
66- val run : F [Option [T ]] =
67- queue.take.flatMap {
68- case Right (v) => ensureMessages *> v.some.pure[F ]
69- case Left (Some (error)) => F .raiseError(error)
70- case Left (None ) => none[T ].pure[F ]
73+ type S = Either [Option [Throwable ], Chunk [T ]]
74+
75+ def zero : S = Chunk .empty.asRight
76+ def loop (state : S ): F [Option [(Chunk [T ], S )]] =
77+ state match {
78+ case Left (None ) => F .pure(none)
79+ case Left (Some (err)) => F .raiseError(err)
80+ case Right (acc) =>
81+ queue.tryTake.flatMap {
82+ case Some (Right (value)) => loop((acc ++ Chunk .singleton(value)).asRight)
83+ case Some (Left (err)) =>
84+ if (acc.isEmpty) loop(err.asLeft)
85+ else F .pure((acc.toIndexedChunk, err.asLeft).some)
86+ case None =>
87+ val await = if (acc.isEmpty) queue.take.flatMap {
88+ case Right (value) => loop(Chunk .singleton(value).asRight)
89+ case Left (err) => loop(err.asLeft)
90+ }
91+ else F .pure((acc.toIndexedChunk, zero).some)
92+
93+ updateRequests *> await
94+ }
7195 }
7296
73- Stream .repeatEval(run).unNoneTerminate
74-
97+ Stream .unfoldChunkEval(zero)(loop)
7598 }
76-
7799 }
78100
79101}
0 commit comments