Skip to content

Commit ebe8644

Browse files
committed
Expose 'prefetchN' parameter in 'ServerOptions'. Move 'f.g.{client => shared}.StreamIngest'.
1 parent ce56120 commit ebe8644

File tree

6 files changed

+20
-6
lines changed

6 files changed

+20
-6
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ inThisBuild(
4141
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.grpc.client.StreamIngest.create"),
4242
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.grpc.server.Fs2StreamServerCallListener*"),
4343
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.grpc.client.Fs2StreamClientCallListener*"),
44+
ProblemFilters.exclude[MissingClassProblem]("fs2.grpc.client.StreamIngest*"),
4445
ProblemFilters.exclude[MissingClassProblem]("fs2.grpc.codegen.Fs2GrpcServicePrinter$constants$"),
4546
ProblemFilters.exclude[MissingFieldProblem]("fs2.grpc.codegen.Fs2GrpcServicePrinter.constants"),
4647
// deleted private classes

runtime/src/main/scala/fs2/grpc/client/Fs2StreamClientCallListener.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import cats.effect.SyncIO
2727
import cats.implicits._
2828
import cats.effect.kernel.Concurrent
2929
import cats.effect.std.Dispatcher
30+
import fs2.grpc.shared.StreamIngest
3031
import io.grpc.{ClientCall, Metadata, Status}
3132

3233
private[client] class Fs2StreamClientCallListener[F[_], Response] private (

runtime/src/main/scala/fs2/grpc/server/Fs2StreamServerCallListener.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import cats.syntax.all._
2828
import cats.effect.kernel.Deferred
2929
import cats.effect.{Async, SyncIO}
3030
import cats.effect.std.Dispatcher
31-
import fs2.grpc.client.StreamIngest
31+
import fs2.grpc.shared.StreamIngest
3232
import io.grpc.ServerCall
3333

3434
private[server] class Fs2StreamServerCallListener[F[_], Request, Response] private (
@@ -67,7 +67,8 @@ private[server] object Fs2StreamServerCallListener {
6767
)(implicit F: Async[F]): F[Fs2StreamServerCallListener[F, Request, Response]] = for {
6868
isCancelled <- Deferred[F, Unit]
6969
request = (n: Int) => F.delay(call.request(n))
70-
ingest <- StreamIngest[F, Request](request, prefetchN = 1)
70+
prefetchN = math.max(options.prefetchN, 1)
71+
ingest <- StreamIngest[F, Request](request, prefetchN)
7172
serverCall <- Fs2ServerCall[F, Request, Response](call, options)
7273
} yield new Fs2StreamServerCallListener[F, Request, Response](
7374
ingest,

runtime/src/main/scala/fs2/grpc/server/ServerOptions.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,22 @@ package grpc
2424
package server
2525

2626
sealed abstract class ServerOptions private (
27+
val prefetchN: Int,
2728
val callOptionsFn: ServerCallOptions => ServerCallOptions
2829
) {
2930

3031
private def copy(
31-
callOptionsFn: ServerCallOptions => ServerCallOptions
32-
): ServerOptions = new ServerOptions(callOptionsFn) {}
32+
prefetchN: Int = this.prefetchN,
33+
callOptionsFn: ServerCallOptions => ServerCallOptions = this.callOptionsFn
34+
): ServerOptions = new ServerOptions(prefetchN, callOptionsFn) {}
35+
36+
/** Prefetch up to @param n messages from a client. The server will try to keep the internal buffer filled according
37+
* to the provided value.
38+
*
39+
* If the provided value is less than 1 it defaults to 1.
40+
*/
41+
def withPrefetchN(n: Int): ServerOptions =
42+
copy(prefetchN = math.max(n, 1))
3343

3444
/** Function that is applied on `fs2.grpc.ServerCallOptions.default` for each new RPC call.
3545
*/
@@ -40,6 +50,7 @@ sealed abstract class ServerOptions private (
4050
object ServerOptions {
4151

4252
val default: ServerOptions = new ServerOptions(
53+
prefetchN = 1,
4354
callOptionsFn = identity
4455
) {}
4556

runtime/src/main/scala/fs2/grpc/client/StreamIngest.scala renamed to runtime/src/main/scala/fs2/grpc/shared/StreamIngest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
package fs2
2323
package grpc
24-
package client
24+
package shared
2525

2626
import cats.implicits._
2727
import cats.effect.{Concurrent, Ref}

runtime/src/test/scala/fs2/grpc/client/StreamIngestSuite.scala renamed to runtime/src/test/scala/fs2/grpc/shared/StreamIngestSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
package fs2
2323
package grpc
24-
package client
24+
package shared
2525

2626
import cats.effect._
2727
import munit._

0 commit comments

Comments
 (0)