Skip to content

Commit 81e68fd

Browse files
Merge pull request #669 from ValdemarGr/main
668 aspect oriented middleware
2 parents be04a70 + eaaeda1 commit 81e68fd

File tree

8 files changed

+937
-93
lines changed

8 files changed

+937
-93
lines changed

build.sbt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ inThisBuild(
3535
) ++ List(
3636
mimaBinaryIssueFilters ++= Seq(
3737
// API that is not extended by end-users
38-
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.grpc.GeneratedCompanion.mkClient"),
38+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.grpc.GeneratedCompanion.mkClientFull"),
39+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.grpc.GeneratedCompanion.serviceBindingFull"),
3940
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.grpc.GeneratedCompanion.serviceDescriptor"),
4041
// package private APIs
4142
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.grpc.client.StreamIngest.create"),
@@ -138,7 +139,13 @@ lazy val e2e = (projectMatrix in file("e2e"))
138139
.settings(
139140
codeGenClasspath := (codeGenJVM212 / Compile / fullClasspath).value,
140141
libraryDependencies := Nil,
141-
libraryDependencies ++= List(scalaPbGrpcRuntime, scalaPbRuntime, scalaPbRuntime % "protobuf", ceMunit % Test),
142+
libraryDependencies ++= List(
143+
scalaPbGrpcRuntime,
144+
scalaPbRuntime,
145+
scalaPbRuntime % "protobuf",
146+
ceMunit % Test,
147+
"io.grpc" % "grpc-inprocess" % versions.grpc % Test
148+
),
142149
Compile / PB.targets := Seq(
143150
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb",
144151
genModule(codegenFullName + "$") -> (Compile / sourceManaged).value / "fs2-grpc"

codegen/src/main/scala/fs2/grpc/codegen/Fs2AbstractServicePrinter.scala

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,12 @@
2222
package fs2.grpc.codegen
2323

2424
import com.google.protobuf.Descriptors.{MethodDescriptor, ServiceDescriptor}
25-
import fs2.grpc.codegen.Fs2AbstractServicePrinter.constants.{
26-
Async,
27-
Channel,
28-
ClientOptions,
29-
Companion,
30-
Ctx,
31-
Dispatcher,
32-
Fs2ClientCall,
33-
Fs2ServerCallHandler,
34-
Metadata,
35-
ServerOptions,
36-
ServerServiceDefinition,
37-
Stream
38-
}
3925
import scalapb.compiler.{DescriptorImplicits, FunctionalPrinter}
4026
import scalapb.compiler.FunctionalPrinter.PrinterEndo
4127
import scalapb.compiler.ProtobufGenerator.asScalaDocBlock
4228

4329
abstract class Fs2AbstractServicePrinter extends Fs2ServicePrinter {
30+
import Fs2AbstractServicePrinter.constants._
4431

4532
val service: ServiceDescriptor
4633
val serviceSuffix: String
@@ -56,39 +43,55 @@ abstract class Fs2AbstractServicePrinter extends Fs2ServicePrinter {
5643

5744
protected[this] def handleMethod(method: MethodDescriptor): String
5845

46+
protected[this] def visitMethod(method: MethodDescriptor): String =
47+
s"visit" + handleMethod(method).capitalize
48+
5949
private[this] def createClientCall(method: MethodDescriptor) = {
6050
val basicClientCall =
61-
s"$Fs2ClientCall[F](channel, ${method.grpcDescriptor.fullName}, dispatcher, clientOptions)"
51+
s"$Fs2ClientCall[G](channel, ${method.grpcDescriptor.fullName}, dispatcher, clientOptions)"
6252
if (method.isServerStreaming)
6353
s"$Stream.eval($basicClientCall)"
6454
else
6555
basicClientCall
6656
}
6757

6858
private[this] def serviceMethodImplementation(method: MethodDescriptor): PrinterEndo = { p =>
69-
val mkMetadata = if (method.isServerStreaming) s"$Stream.eval(mkMetadata(ctx))" else "mkMetadata(ctx)"
59+
val inType = method.inputType.scalaType
60+
val outType = method.outputType.scalaType
61+
val descriptor = method.grpcDescriptor.fullName
7062

71-
p.add(serviceMethodSignature(method) + " = {")
72-
.indent
73-
.add(s"$mkMetadata.flatMap { m =>")
74-
.indent
75-
.add(s"${createClientCall(method)}.flatMap(_.${handleMethod(method)}(request, m))")
76-
.outdent
77-
.add("}")
78-
.outdent
79-
.add("}")
63+
p.add(serviceMethodSignature(method) + " =")
64+
.indented {
65+
_.addStringMargin(
66+
s"""|clientAspect.${visitMethod(method)}[$inType, $outType](
67+
| ${ClientCallContext}(ctx, $descriptor),
68+
| request,
69+
| (req, m) => ${createClientCall(method)}.flatMap(_.${handleMethod(method)}(req, m))
70+
|)""".stripMargin
71+
)
72+
}
8073
}
8174

8275
private[this] def serviceBindingImplementation(method: MethodDescriptor): PrinterEndo = { p =>
8376
val inType = method.inputType.scalaType
8477
val outType = method.outputType.scalaType
8578
val descriptor = method.grpcDescriptor.fullName
86-
val handler = s"$Fs2ServerCallHandler[F](dispatcher, serverOptions).${handleMethod(method)}[$inType, $outType]"
79+
val handler = s"$Fs2ServerCallHandler[G](dispatcher, serverOptions).${handleMethod(method)}[$inType, $outType]"
8780

8881
val serviceCall = s"serviceImpl.${method.name}"
89-
val eval = if (method.isServerStreaming) s"$Stream.eval(mkCtx(m))" else "mkCtx(m)"
9082

91-
p.add(s".addMethod($descriptor, $handler((r, m) => $eval.flatMap($serviceCall(r, _))))")
83+
p.addStringMargin {
84+
s"""|.addMethod(
85+
| $descriptor,
86+
| $handler{ (r, m) =>
87+
| serviceAspect.${visitMethod(method)}[$inType, $outType](
88+
| ${ServiceCallContext}(m, $descriptor),
89+
| r,
90+
| (r, m) => $serviceCall(r, m)
91+
| )
92+
| }
93+
|)"""
94+
}
9295
}
9396

9497
private[this] def serviceMethodImplementations: PrinterEndo =
@@ -129,17 +132,27 @@ abstract class Fs2AbstractServicePrinter extends Fs2ServicePrinter {
129132
}
130133

131134
private[this] def serviceClient: PrinterEndo = {
132-
_.add(
133-
s"def mkClient[F[_]: $Async, $Ctx](dispatcher: $Dispatcher[F], channel: $Channel, mkMetadata: $Ctx => F[$Metadata], clientOptions: $ClientOptions): $serviceNameFs2[F, $Ctx] = new $serviceNameFs2[F, $Ctx] {"
135+
_.addStringMargin(
136+
s"""|def mkClientFull[F[_], G[_]: $Async, $Ctx](
137+
| dispatcher: $Dispatcher[G],
138+
| channel: $Channel,
139+
| clientAspect: ${ClientAspect}[F, G, $Ctx],
140+
| clientOptions: $ClientOptions
141+
|): $serviceNameFs2[F, $Ctx] = new $serviceNameFs2[F, $Ctx] {"""
134142
).indent
135143
.call(serviceMethodImplementations)
136144
.outdent
137145
.add("}")
138146
}
139147

140148
private[this] def serviceBinding: PrinterEndo = {
141-
_.add(
142-
s"protected def serviceBinding[F[_]: $Async, $Ctx](dispatcher: $Dispatcher[F], serviceImpl: $serviceNameFs2[F, $Ctx], mkCtx: $Metadata => F[$Ctx], serverOptions: $ServerOptions): $ServerServiceDefinition = {"
149+
_.addStringMargin(
150+
s"""|protected def serviceBindingFull[F[_], G[_]: $Async, $Ctx](
151+
| dispatcher: $Dispatcher[G],
152+
| serviceImpl: $serviceNameFs2[F, $Ctx],
153+
| serviceAspect: ${ServiceAspect}[F, G, $Ctx],
154+
| serverOptions: $ServerOptions
155+
|) = {"""
143156
).indent
144157
.add(s"$ServerServiceDefinition")
145158
.call(serviceBindingImplementations)
@@ -173,6 +186,8 @@ object Fs2AbstractServicePrinter {
173186
private val fs2Pkg = "_root_.fs2"
174187
private val fs2grpcPkg = "_root_.fs2.grpc"
175188
private val grpcPkg = "_root_.io.grpc"
189+
private val fs2grpcServerPkg = "_root_.fs2.grpc.server"
190+
private val fs2grpcClientPkg = "_root_.fs2.grpc.client"
176191

177192
// /
178193

@@ -194,6 +209,10 @@ object Fs2AbstractServicePrinter {
194209
val Metadata = s"$grpcPkg.Metadata"
195210
val ServiceDescriptor = s"$grpcPkg.ServiceDescriptor"
196211

212+
val ServiceAspect = s"${fs2grpcServerPkg}.ServiceAspect"
213+
val ServiceCallContext = s"${fs2grpcServerPkg}.ServiceCallContext"
214+
val ClientAspect = s"${fs2grpcClientPkg}.ClientAspect"
215+
val ClientCallContext = s"${fs2grpcClientPkg}.ClientCallContext"
197216
}
198217

199218
}

e2e/src/test/resources/TestServiceFs2Grpc.scala.txt

Lines changed: 77 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,37 +24,87 @@ object TestServiceFs2Grpc extends _root_.fs2.grpc.GeneratedCompanion[TestService
2424

2525
def serviceDescriptor: _root_.io.grpc.ServiceDescriptor = hello.world.TestServiceGrpc.SERVICE
2626

27-
def mkClient[F[_]: _root_.cats.effect.Async, A](dispatcher: _root_.cats.effect.std.Dispatcher[F], channel: _root_.io.grpc.Channel, mkMetadata: A => F[_root_.io.grpc.Metadata], clientOptions: _root_.fs2.grpc.client.ClientOptions): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] {
28-
def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] = {
29-
mkMetadata(ctx).flatMap { m =>
30-
_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(request, m))
31-
}
32-
}
33-
def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] = {
34-
mkMetadata(ctx).flatMap { m =>
35-
_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(request, m))
36-
}
37-
}
38-
def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] = {
39-
_root_.fs2.Stream.eval(mkMetadata(ctx)).flatMap { m =>
40-
_root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(request, m))
41-
}
42-
}
43-
def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] = {
44-
_root_.fs2.Stream.eval(mkMetadata(ctx)).flatMap { m =>
45-
_root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[F](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(request, m))
46-
}
47-
}
27+
def mkClientFull[F[_], G[_]: _root_.cats.effect.Async, A](
28+
dispatcher: _root_.cats.effect.std.Dispatcher[G],
29+
channel: _root_.io.grpc.Channel,
30+
clientAspect: _root_.fs2.grpc.client.ClientAspect[F, G, A],
31+
clientOptions: _root_.fs2.grpc.client.ClientOptions
32+
): TestServiceFs2Grpc[F, A] = new TestServiceFs2Grpc[F, A] {
33+
def noStreaming(request: hello.world.TestMessage, ctx: A): F[hello.world.TestMessage] =
34+
clientAspect.visitUnaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage](
35+
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
36+
request,
37+
(req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_NO_STREAMING, dispatcher, clientOptions).flatMap(_.unaryToUnaryCall(req, m))
38+
)
39+
def clientStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): F[hello.world.TestMessage] =
40+
clientAspect.visitStreamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage](
41+
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
42+
request,
43+
(req, m) => _root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, dispatcher, clientOptions).flatMap(_.streamingToUnaryCall(req, m))
44+
)
45+
def serverStreaming(request: hello.world.TestMessage, ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
46+
clientAspect.visitUnaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage](
47+
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
48+
request,
49+
(req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, dispatcher, clientOptions)).flatMap(_.unaryToStreamingCall(req, m))
50+
)
51+
def bothStreaming(request: _root_.fs2.Stream[F, hello.world.TestMessage], ctx: A): _root_.fs2.Stream[F, hello.world.TestMessage] =
52+
clientAspect.visitStreamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage](
53+
_root_.fs2.grpc.client.ClientCallContext(ctx, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
54+
request,
55+
(req, m) => _root_.fs2.Stream.eval(_root_.fs2.grpc.client.Fs2ClientCall[G](channel, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, dispatcher, clientOptions)).flatMap(_.streamingToStreamingCall(req, m))
56+
)
4857
}
4958

50-
protected def serviceBinding[F[_]: _root_.cats.effect.Async, A](dispatcher: _root_.cats.effect.std.Dispatcher[F], serviceImpl: TestServiceFs2Grpc[F, A], mkCtx: _root_.io.grpc.Metadata => F[A], serverOptions: _root_.fs2.grpc.server.ServerOptions): _root_.io.grpc.ServerServiceDefinition = {
59+
protected def serviceBindingFull[F[_], G[_]: _root_.cats.effect.Async, A](
60+
dispatcher: _root_.cats.effect.std.Dispatcher[G],
61+
serviceImpl: TestServiceFs2Grpc[F, A],
62+
serviceAspect: _root_.fs2.grpc.server.ServiceAspect[F, G, A],
63+
serverOptions: _root_.fs2.grpc.server.ServerOptions
64+
) = {
5165
_root_.io.grpc.ServerServiceDefinition
5266
.builder(hello.world.TestServiceGrpc.SERVICE)
53-
.addMethod(hello.world.TestServiceGrpc.METHOD_NO_STREAMING, _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]((r, m) => mkCtx(m).flatMap(serviceImpl.noStreaming(r, _))))
54-
.addMethod(hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING, _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]((r, m) => mkCtx(m).flatMap(serviceImpl.clientStreaming(r, _))))
55-
.addMethod(hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING, _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]((r, m) => _root_.fs2.Stream.eval(mkCtx(m)).flatMap(serviceImpl.serverStreaming(r, _))))
56-
.addMethod(hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING, _root_.fs2.grpc.server.Fs2ServerCallHandler[F](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]((r, m) => _root_.fs2.Stream.eval(mkCtx(m)).flatMap(serviceImpl.bothStreaming(r, _))))
67+
.addMethod(
68+
hello.world.TestServiceGrpc.METHOD_NO_STREAMING,
69+
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
70+
serviceAspect.visitUnaryToUnaryCall[hello.world.TestMessage, hello.world.TestMessage](
71+
_root_.fs2.grpc.server.ServiceCallContext(m, hello.world.TestServiceGrpc.METHOD_NO_STREAMING),
72+
r,
73+
(r, m) => serviceImpl.noStreaming(r, m)
74+
)
75+
}
76+
)
77+
.addMethod(
78+
hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING,
79+
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
80+
serviceAspect.visitStreamingToUnaryCall[hello.world.TestMessage, hello.world.TestMessage](
81+
_root_.fs2.grpc.server.ServiceCallContext(m, hello.world.TestServiceGrpc.METHOD_CLIENT_STREAMING),
82+
r,
83+
(r, m) => serviceImpl.clientStreaming(r, m)
84+
)
85+
}
86+
)
87+
.addMethod(
88+
hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING,
89+
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).unaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
90+
serviceAspect.visitUnaryToStreamingCall[hello.world.TestMessage, hello.world.TestMessage](
91+
_root_.fs2.grpc.server.ServiceCallContext(m, hello.world.TestServiceGrpc.METHOD_SERVER_STREAMING),
92+
r,
93+
(r, m) => serviceImpl.serverStreaming(r, m)
94+
)
95+
}
96+
)
97+
.addMethod(
98+
hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING,
99+
_root_.fs2.grpc.server.Fs2ServerCallHandler[G](dispatcher, serverOptions).streamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage]{ (r, m) =>
100+
serviceAspect.visitStreamingToStreamingCall[hello.world.TestMessage, hello.world.TestMessage](
101+
_root_.fs2.grpc.server.ServiceCallContext(m, hello.world.TestServiceGrpc.METHOD_BOTH_STREAMING),
102+
r,
103+
(r, m) => serviceImpl.bothStreaming(r, m)
104+
)
105+
}
106+
)
57107
.build()
58108
}
59109

60-
}
110+
}

0 commit comments

Comments
 (0)