Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,27 @@ lazy val e2e = (projectMatrix in file("e2e"))
val output = (Compile / sourceManaged).value / "fs2-grpc" / "disable-trailers"
}

val renderContextAsImplicit = new {
val args = Seq(
"fs2_grpc:service_suffix=Fs2GrpcRenderContextAsImplicit",
"fs2_grpc:render_context_as_implicit"
) ++ (if (tlIsScala3.value) Seq("scala3_sources") else Nil)

val output = (Compile / sourceManaged).value / "fs2-grpc" / "render-context-as-implicit"
}

Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb",
genModule(codegenFullName + "$") -> (Compile / sourceManaged).value / "fs2-grpc",
(genModule(codegenFullName + "$"), disableTrailers.args) -> disableTrailers.output
(genModule(codegenFullName + "$"), disableTrailers.args) -> disableTrailers.output,
(genModule(codegenFullName + "$"), renderContextAsImplicit.args) -> renderContextAsImplicit.output
)
},
buildInfoPackage := "fs2.grpc.e2e.buildinfo",
buildInfoKeys := Seq[BuildInfoKey]("sourceManaged" -> (Compile / sourceManaged).value / "fs2-grpc"),
buildInfoKeys := Seq[BuildInfoKey](
"sourceManaged" -> (Compile / sourceManaged).value / "fs2-grpc",
"scalaVersion" -> scalaVersion.value
),
githubWorkflowArtifactUpload := false,
scalacOptions := {
if (tlIsScala3.value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ abstract class Fs2AbstractServicePrinter extends Fs2ServicePrinter {
val service: ServiceDescriptor
val serviceSuffix: String
val di: DescriptorImplicits
protected[this] val renderContextAsImplicit: Boolean = false
protected[this] val scala3Sources: Boolean = false

import di._

Expand All @@ -55,6 +57,18 @@ abstract class Fs2AbstractServicePrinter extends Fs2ServicePrinter {
basicClientCall
}

protected[this] def renderCtxParameter(): String = {
if (renderContextAsImplicit) {
if (scala3Sources) {
s")(using ctx: $Ctx"
} else {
s")(implicit ctx: $Ctx"
}
} else {
s", ctx: $Ctx"
}
}

private[this] def serviceMethodImplementation(method: MethodDescriptor): PrinterEndo = { p =>
val inType = method.inputType.scalaType
val outType = method.outputType.scalaType
Expand All @@ -79,6 +93,15 @@ abstract class Fs2AbstractServicePrinter extends Fs2ServicePrinter {
val handler = s"$Fs2ServerCallHandler[G](dispatcher, serverOptions).${handleMethod(method)}[$inType, $outType]"

val serviceCall = s"serviceImpl.${method.name}"
val invoke = if (renderContextAsImplicit) {
if (scala3Sources) {
s"$serviceCall(r)(using m)"
} else {
s"$serviceCall(r)(m)"
}
} else {
s"$serviceCall(r, m)"
}

p.addStringMargin {
s"""|.addMethod(
Expand All @@ -87,7 +110,7 @@ abstract class Fs2AbstractServicePrinter extends Fs2ServicePrinter {
| serviceAspect.${visitMethod(method)}[$inType, $outType](
| ${ServiceCallContext}(m, $descriptor),
| r,
| (r, m) => $serviceCall(r, m)
| (r, m) => $invoke
| )
| }
|)"""
Expand Down
39 changes: 34 additions & 5 deletions codegen/src/main/scala/fs2/grpc/codegen/Fs2CodeGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,42 @@ import scala.jdk.CollectionConverters.*
sealed trait Fs2Params {
def serviceSuffix: String
def disableTrailers: Boolean
def renderContextAsImplicit: Boolean
def scala3Sources: Boolean

def withServiceSuffix(serviceSuffix: String): Fs2Params
def withDisableTrailers(value: Boolean): Fs2Params
def withRenderContextAsImplicit(value: Boolean): Fs2Params
def withScala3Sources(value: Boolean): Fs2Params
}

object Fs2Params {

def default: Fs2Params =
Impl(
serviceSuffix = "Fs2Grpc",
disableTrailers = false
disableTrailers = false,
renderContextAsImplicit = false,
scala3Sources = false
)

private final case class Impl(
serviceSuffix: String,
disableTrailers: Boolean
disableTrailers: Boolean,
renderContextAsImplicit: Boolean,
scala3Sources: Boolean
) extends Fs2Params {
def withServiceSuffix(serviceSuffix: String): Fs2Params =
copy(serviceSuffix = serviceSuffix)

def withDisableTrailers(value: Boolean): Fs2Params =
copy(disableTrailers = value)

def withRenderContextAsImplicit(value: Boolean): Fs2Params =
copy(renderContextAsImplicit = value)

def withScala3Sources(value: Boolean): Fs2Params =
copy(scala3Sources = value)
}
}

Expand Down Expand Up @@ -92,7 +106,13 @@ object Fs2CodeGenerator extends CodeGenApp {
service,
fs2params.serviceSuffix + "Trailers",
di,
new Fs2GrpcExhaustiveTrailersServicePrinter(_, fs2params.serviceSuffix + "Trailers", di)
new Fs2GrpcExhaustiveTrailersServicePrinter(
_,
fs2params.serviceSuffix + "Trailers",
fs2params.renderContextAsImplicit,
fs2params.scala3Sources,
di
)
)
)

Expand All @@ -102,7 +122,13 @@ object Fs2CodeGenerator extends CodeGenApp {
service,
fs2params.serviceSuffix,
di,
new Fs2GrpcServicePrinter(_, fs2params.serviceSuffix, di)
new Fs2GrpcServicePrinter(
_,
fs2params.serviceSuffix,
fs2params.renderContextAsImplicit,
fs2params.scala3Sources,
di
)
)

trailers :+ general
Expand All @@ -114,9 +140,11 @@ object Fs2CodeGenerator extends CodeGenApp {
paramsAndUnparsed <- GeneratorParams.fromStringCollectUnrecognized(params)
params = paramsAndUnparsed._1
unparsed = paramsAndUnparsed._2
suffix <- unparsed.map(_.split("=", 2).toList).foldLeft[Either[String, Fs2Params]](Right(Fs2Params.default)) {
initial = Fs2Params.default.withScala3Sources(params.scala3Sources)
suffix <- unparsed.map(_.split("=", 2).toList).foldLeft[Either[String, Fs2Params]](Right(initial)) {
case (Right(params), ServiceSuffix :: suffix :: Nil) => Right(params.withServiceSuffix(suffix))
case (Right(params), DisableTrailers :: Nil) => Right(params.withDisableTrailers(true))
case (Right(params), RenderContextAsImplicit :: Nil) => Right(params.withRenderContextAsImplicit(true))
case (Right(_), ServiceSuffixPluginKey :: _ :: Nil) =>
Left(s"The '$ServiceSuffixPluginKey' is replaced with '$ServiceSuffix'")
case (Right(_), xs) => Left(s"Unrecognized parameter: $xs")
Expand Down Expand Up @@ -152,4 +180,5 @@ object Fs2CodeGenerator extends CodeGenApp {
private[codegen] val ServiceSuffixPluginKey: String = "serviceSuffix"
private[codegen] val ServiceSuffix: String = "fs2_grpc:service_suffix"
private[codegen] val DisableTrailers: String = "fs2_grpc:disable_trailers"
private[codegen] val RenderContextAsImplicit: String = "fs2_grpc:render_context_as_implicit"
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import scalapb.compiler.{DescriptorImplicits, StreamType}
class Fs2GrpcExhaustiveTrailersServicePrinter(
val service: ServiceDescriptor,
val serviceSuffix: String,
override val renderContextAsImplicit: Boolean,
override val scala3Sources: Boolean,
val di: DescriptorImplicits
) extends Fs2AbstractServicePrinter {
import fs2.grpc.codegen.Fs2AbstractServicePrinter.constants._
Expand All @@ -36,13 +38,13 @@ class Fs2GrpcExhaustiveTrailersServicePrinter(

val scalaInType = method.inputType.scalaType
val scalaOutType = method.outputType.scalaType
val ctx = s"ctx: $Ctx"
val ctx = renderCtxParameter()

s"def ${method.name}" + (method.streamType match {
case StreamType.Unary => s"(request: $scalaInType, $ctx): F[($scalaOutType, $Metadata)]"
case StreamType.ClientStreaming => s"(request: $Stream[F, $scalaInType], $ctx): F[($scalaOutType, $Metadata)]"
case StreamType.ServerStreaming => s"(request: $scalaInType, $ctx): $Stream[F, $scalaOutType]"
case StreamType.Bidirectional => s"(request: $Stream[F, $scalaInType], $ctx): $Stream[F, $scalaOutType]"
case StreamType.Unary => s"(request: $scalaInType$ctx): F[($scalaOutType, $Metadata)]"
case StreamType.ClientStreaming => s"(request: $Stream[F, $scalaInType]$ctx): F[($scalaOutType, $Metadata)]"
case StreamType.ServerStreaming => s"(request: $scalaInType$ctx): $Stream[F, $scalaOutType]"
case StreamType.Bidirectional => s"(request: $Stream[F, $scalaInType]$ctx): $Stream[F, $scalaOutType]"
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,27 @@ package fs2.grpc.codegen
import com.google.protobuf.Descriptors.{MethodDescriptor, ServiceDescriptor}
import scalapb.compiler.{DescriptorImplicits, StreamType}

class Fs2GrpcServicePrinter(val service: ServiceDescriptor, val serviceSuffix: String, val di: DescriptorImplicits)
extends Fs2AbstractServicePrinter {
class Fs2GrpcServicePrinter(
val service: ServiceDescriptor,
val serviceSuffix: String,
override val renderContextAsImplicit: Boolean,
override val scala3Sources: Boolean,
val di: DescriptorImplicits
) extends Fs2AbstractServicePrinter {
import fs2.grpc.codegen.Fs2AbstractServicePrinter.constants._
import di._

override protected def serviceMethodSignature(method: MethodDescriptor): String = {

val scalaInType = method.inputType.scalaType
val scalaOutType = method.outputType.scalaType
val ctx = s"ctx: $Ctx"
val ctx = renderCtxParameter()

s"def ${method.name}" + (method.streamType match {
case StreamType.Unary => s"(request: $scalaInType, $ctx): F[$scalaOutType]"
case StreamType.ClientStreaming => s"(request: $Stream[F, $scalaInType], $ctx): F[$scalaOutType]"
case StreamType.ServerStreaming => s"(request: $scalaInType, $ctx): $Stream[F, $scalaOutType]"
case StreamType.Bidirectional => s"(request: $Stream[F, $scalaInType], $ctx): $Stream[F, $scalaOutType]"
case StreamType.Unary => s"(request: $scalaInType$ctx): F[$scalaOutType]"
case StreamType.ClientStreaming => s"(request: $Stream[F, $scalaInType]$ctx): F[$scalaOutType]"
case StreamType.ServerStreaming => s"(request: $scalaInType$ctx): $Stream[F, $scalaOutType]"
case StreamType.Bidirectional => s"(request: $Stream[F, $scalaInType]$ctx): $Stream[F, $scalaOutType]"
})
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package hello.world

import _root_.cats.syntax.all._

/** TestService: Example gRPC service used in e2e tests
* It demonstrates all four RPC shapes.
*/
trait TestServiceFs2GrpcRenderContextAsImplicit[F[_], A] {
/** Unary RPC: no streaming in either direction
*/
def noStreaming(request: hello.world.TestMessage)(implicit ctx: A): F[hello.world.TestMessage]
/** Client streaming RPC: client streams, server returns a single response
*/
def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage])(implicit ctx: A): F[hello.world.TestMessage]
/** Server streaming RPC: client sends one request, server streams responses
*/
def serverStreaming(request: hello.world.TestMessage)(implicit ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
/** Bidirectional streaming RPC: both client and server stream
*/
def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage])(implicit ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage]
}

object TestServiceFs2GrpcRenderContextAsImplicit extends _root_.fs2.grpc.GeneratedCompanion[TestServiceFs2GrpcRenderContextAsImplicit] {

def serviceDescriptor: _root_.io.grpc.ServiceDescriptor = hello.world.TestServiceGrpc.SERVICE

def mkClientFull[F[_], G[_]: _root_.cats.effect.Async, A](
dispatcher: _root_.cats.effect.std.Dispatcher[G],
channel: _root_.io.grpc.Channel,
clientAspect: _root_.fs2.grpc.client.ClientAspect[F, G, A],
clientOptions: _root_.fs2.grpc.client.ClientOptions
): TestServiceFs2GrpcRenderContextAsImplicit[F, A] = new TestServiceFs2GrpcRenderContextAsImplicit[F, A] {
def noStreaming(request: hello.world.TestMessage)(implicit ctx: A): F[hello.world.TestMessage] =
clientAspect.visitUnaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
request,
(req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m))
)
def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage])(implicit ctx: A): F[hello.world.TestMessage] =
clientAspect.visitStreamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
request,
(req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m))
)
def serverStreaming(request: hello.world.TestMessage)(implicit ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
clientAspect.visitUnaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
request,
(req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m))
)
def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage])(implicit ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
clientAspect.visitStreamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
request,
(req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m))
)
}

protected def serviceBindingFull[F[_], G[_]: _root_.cats.effect.Async, A](
dispatcher: _root_.cats.effect.std.Dispatcher[G],
serviceImpl: TestServiceFs2GrpcRenderContextAsImplicit[F, A],
serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, G, A],
serverOptions: _root_.fs2.grpc.server.ServerOptions
) = {
_root_.io.grpc.ServerServiceDefinition
.builder(hello.world.TestServiceGrpc.SERVICE)
.addMethod(
hello.world.TestServiceGrpc.METHOD_NO_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitUnaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServiceCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
r,
(r, m) => serviceImpl.noStreaming(r)(m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitStreamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServiceCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
r,
(r, m) => serviceImpl.clientStreaming(r)(m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitUnaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServiceCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
r,
(r, m) => serviceImpl.serverStreaming(r)(m)
)
}
)
.addMethod(
hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING,
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
serviceAspect.visitStreamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage](
_root_.fs2.grpc.server.ServiceCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
r,
(r, m) => serviceImpl.bothStreaming(r)(m)
)
}
)
.build()
}

}
Loading