Skip to content

Commit e86e67d

Browse files
Merge pull request #797 from seigert/improve_stream_ingest_thoughtput
Improve `StreamIngest` thoughtput by prefetching in bulk and chunking of fetched messages
2 parents 87e1535 + ebe8644 commit e86e67d

File tree

8 files changed

+67
-31
lines changed

8 files changed

+67
-31
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ inThisBuild(
4141
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.grpc.client.StreamIngest.create"),
4242
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.grpc.server.Fs2StreamServerCallListener*"),
4343
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.grpc.client.Fs2StreamClientCallListener*"),
44+
ProblemFilters.exclude[MissingClassProblem]("fs2.grpc.client.StreamIngest*"),
4445
ProblemFilters.exclude[MissingClassProblem]("fs2.grpc.codegen.Fs2GrpcServicePrinter$constants$"),
4546
ProblemFilters.exclude[MissingFieldProblem]("fs2.grpc.codegen.Fs2GrpcServicePrinter.constants"),
4647
// deleted private classes

runtime/src/main/scala/fs2/grpc/client/Fs2ClientCall.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class Fs2ClientCall[F[_], Request, Response] private[client] (
109109
): Resource[F, Fs2StreamClientCallListener[F, Response]] = {
110110
val prefetchN = options.prefetchN.max(1)
111111
val create = Fs2StreamClientCallListener.create[F, Response](request, signalReadiness, dispatcher, prefetchN)
112-
val acquire = start(create, md) <* request(prefetchN)
112+
val acquire = start(create, md)
113113
val release = handleExitCase(cancelSucceed = true)
114114

115115
Resource.makeCase(acquire)(release)

runtime/src/main/scala/fs2/grpc/client/Fs2StreamClientCallListener.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import cats.effect.SyncIO
2727
import cats.implicits._
2828
import cats.effect.kernel.Concurrent
2929
import cats.effect.std.Dispatcher
30+
import fs2.grpc.shared.StreamIngest
3031
import io.grpc.{ClientCall, Metadata, Status}
3132

3233
private[client] class Fs2StreamClientCallListener[F[_], Response] private (

runtime/src/main/scala/fs2/grpc/server/Fs2StreamServerCallListener.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import cats.syntax.all._
2828
import cats.effect.kernel.Deferred
2929
import cats.effect.{Async, SyncIO}
3030
import cats.effect.std.Dispatcher
31-
import fs2.grpc.client.StreamIngest
31+
import fs2.grpc.shared.StreamIngest
3232
import io.grpc.ServerCall
3333

3434
private[server] class Fs2StreamServerCallListener[F[_], Request, Response] private (
@@ -67,7 +67,8 @@ private[server] object Fs2StreamServerCallListener {
6767
)(implicit F: Async[F]): F[Fs2StreamServerCallListener[F, Request, Response]] = for {
6868
isCancelled <- Deferred[F, Unit]
6969
request = (n: Int) => F.delay(call.request(n))
70-
ingest <- StreamIngest[F, Request](request, prefetchN = 1)
70+
prefetchN = math.max(options.prefetchN, 1)
71+
ingest <- StreamIngest[F, Request](request, prefetchN)
7172
serverCall <- Fs2ServerCall[F, Request, Response](call, options)
7273
} yield new Fs2StreamServerCallListener[F, Request, Response](
7374
ingest,

runtime/src/main/scala/fs2/grpc/server/ServerOptions.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,22 @@ package grpc
2424
package server
2525

2626
sealed abstract class ServerOptions private (
27+
val prefetchN: Int,
2728
val callOptionsFn: ServerCallOptions => ServerCallOptions
2829
) {
2930

3031
private def copy(
31-
callOptionsFn: ServerCallOptions => ServerCallOptions
32-
): ServerOptions = new ServerOptions(callOptionsFn) {}
32+
prefetchN: Int = this.prefetchN,
33+
callOptionsFn: ServerCallOptions => ServerCallOptions = this.callOptionsFn
34+
): ServerOptions = new ServerOptions(prefetchN, callOptionsFn) {}
35+
36+
/** Prefetch up to @param n messages from a client. The server will try to keep the internal buffer filled according
37+
* to the provided value.
38+
*
39+
* If the provided value is less than 1 it defaults to 1.
40+
*/
41+
def withPrefetchN(n: Int): ServerOptions =
42+
copy(prefetchN = math.max(n, 1))
3343

3444
/** Function that is applied on `fs2.grpc.ServerCallOptions.default` for each new RPC call.
3545
*/
@@ -40,6 +50,7 @@ sealed abstract class ServerOptions private (
4050
object ServerOptions {
4151

4252
val default: ServerOptions = new ServerOptions(
53+
prefetchN = 1,
4354
callOptionsFn = identity
4455
) {}
4556

runtime/src/main/scala/fs2/grpc/client/StreamIngest.scala renamed to runtime/src/main/scala/fs2/grpc/shared/StreamIngest.scala

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
package fs2
2323
package grpc
24-
package client
24+
package shared
2525

2626
import cats.implicits._
27-
import cats.effect.Concurrent
27+
import cats.effect.{Concurrent, Ref}
2828
import cats.effect.std.Queue
2929

3030
private[grpc] trait StreamIngest[F[_], T] {
@@ -39,41 +39,63 @@ private[grpc] object StreamIngest {
3939
request: Int => F[Unit],
4040
prefetchN: Int
4141
): F[StreamIngest[F, T]] =
42-
Queue
43-
.unbounded[F, Either[Option[Throwable], T]]
44-
.map(q => create[F, T](request, prefetchN, q))
42+
(Ref[F].of(0), Queue.unbounded[F, Either[Option[Throwable], T]])
43+
.mapN((r, q) => create[F, T](request, prefetchN, r, q))
4544

4645
def create[F[_], T](
4746
request: Int => F[Unit],
4847
prefetchN: Int,
48+
requested: Ref[F, Int],
4949
queue: Queue[F, Either[Option[Throwable], T]]
5050
)(implicit F: Concurrent[F]): StreamIngest[F, T] = new StreamIngest[F, T] {
51-
52-
val limit: Int =
53-
math.max(1, prefetchN)
54-
55-
val ensureMessages: F[Unit] =
56-
queue.size.flatMap(qs => request(1).whenA(qs < limit))
51+
private val limit: Int = math.max(1, prefetchN)
52+
private def updateRequests: F[Unit] = {
53+
queue.size.flatMap { queued =>
54+
requested.flatModify { requested =>
55+
val total = queued + requested
56+
val additional = math.max(0, limit - total)
57+
58+
(
59+
requested + additional,
60+
request(additional).whenA(additional > 0)
61+
)
62+
}
63+
}
64+
}
5765

5866
def onMessage(msg: T): F[Unit] =
59-
queue.offer(msg.asRight) *> ensureMessages
67+
queue.offer(msg.asRight) *> requested.update(r => math.max(0, r - 1))
6068

6169
def onClose(error: Option[Throwable]): F[Unit] =
6270
queue.offer(error.asLeft)
6371

6472
val messages: Stream[F, T] = {
65-
66-
val run: F[Option[T]] =
67-
queue.take.flatMap {
68-
case Right(v) => ensureMessages *> v.some.pure[F]
69-
case Left(Some(error)) => F.raiseError(error)
70-
case Left(None) => none[T].pure[F]
73+
type S = Either[Option[Throwable], Chunk[T]]
74+
75+
def zero: S = Chunk.empty.asRight
76+
def loop(state: S): F[Option[(Chunk[T], S)]] =
77+
state match {
78+
case Left(None) => F.pure(none)
79+
case Left(Some(err)) => F.raiseError(err)
80+
case Right(acc) =>
81+
queue.tryTake.flatMap {
82+
case Some(Right(value)) => loop((acc ++ Chunk.singleton(value)).asRight)
83+
case Some(Left(err)) =>
84+
if (acc.isEmpty) loop(err.asLeft)
85+
else F.pure((acc.toIndexedChunk, err.asLeft).some)
86+
case None =>
87+
val await = if (acc.isEmpty) queue.take.flatMap {
88+
case Right(value) => loop(Chunk.singleton(value).asRight)
89+
case Left(err) => loop(err.asLeft)
90+
}
91+
else F.pure((acc.toIndexedChunk, zero).some)
92+
93+
updateRequests *> await
94+
}
7195
}
7296

73-
Stream.repeatEval(run).unNoneTerminate
74-
97+
Stream.unfoldChunkEval(zero)(loop)
7598
}
76-
7799
}
78100

79101
}

runtime/src/test/scala/fs2/grpc/server/ServerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ class ServerSuite extends Fs2GrpcSuite {
360360

361361
tc.tick()
362362

363-
assertEquals(dummy.requested, 1)
363+
assertEquals(dummy.requested, 2)
364364

365365
listener.onMessage("1")
366366
tc.tick()

runtime/src/test/scala/fs2/grpc/client/StreamIngestSuite.scala renamed to runtime/src/test/scala/fs2/grpc/shared/StreamIngestSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
package fs2
2323
package grpc
24-
package client
24+
package shared
2525

2626
import cats.effect._
2727
import munit._
@@ -45,9 +45,9 @@ class StreamIngestSuite extends CatsEffectSuite with CatsEffectFunFixtures {
4545

4646
run(prefetchN = 1, takeN = 1, expectedReq = 1, expectedCount = 1) *>
4747
run(prefetchN = 2, takeN = 1, expectedReq = 2, expectedCount = 1) *>
48-
run(prefetchN = 2, takeN = 2, expectedReq = 3, expectedCount = 2) *>
49-
run(prefetchN = 1024, takeN = 1024, expectedReq = 2047, expectedCount = 1024) *>
50-
run(prefetchN = 1024, takeN = 1023, expectedReq = 2046, expectedCount = 1023)
48+
run(prefetchN = 2, takeN = 2, expectedReq = 2, expectedCount = 2) *>
49+
run(prefetchN = 1024, takeN = 1024, expectedReq = 1024, expectedCount = 1024) *>
50+
run(prefetchN = 1024, takeN = 1023, expectedReq = 1024, expectedCount = 1023)
5151

5252
}
5353

0 commit comments

Comments
 (0)