Skip to content

Commit 72591f8

Browse files
Adding metadata to shardcake's internal calls. (#155)
* Example of adding metadata to shardcake's internal calls. * Updating var name. * Renaming GrpcConfig vars * Moving HandlerAspect to an argument in Server.run. * Updating comments. * Removing unused changes. * Adding serverInterceptors implementation. * Adding tests * Adding tests
1 parent 5ea34ff commit 72591f8

File tree

9 files changed

+197
-49
lines changed

9 files changed

+197
-49
lines changed

examples/src/main/scala/example/complex/ShardManagerApp.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@ import zio._
66

77
object ShardManagerApp extends ZIOAppDefault {
88
def run: Task[Nothing] =
9-
Server.run.provide(
10-
ZLayer.succeed(ManagerConfig.default),
11-
ZLayer.succeed(GrpcConfig.default),
12-
ZLayer.succeed(RedisConfig.default),
13-
redis,
14-
StorageRedis.live, // store data in Redis
15-
PodsHealth.local, // just ping a pod to see if it's alive
16-
GrpcPods.live, // use gRPC protocol
17-
ShardManager.live // Shard Manager logic
18-
)
9+
Server
10+
.run()
11+
.provide(
12+
ZLayer.succeed(ManagerConfig.default),
13+
ZLayer.succeed(GrpcConfig.default),
14+
ZLayer.succeed(RedisConfig.default),
15+
redis,
16+
StorageRedis.live, // store data in Redis
17+
PodsHealth.local, // just ping a pod to see if it's alive
18+
GrpcPods.live, // use gRPC protocol
19+
ShardManager.live // Shard Manager logic
20+
)
1921
}

examples/src/main/scala/example/simple/ShardManagerApp.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import zio._
66

77
object ShardManagerApp extends ZIOAppDefault {
88
def run: Task[Nothing] =
9-
Server.run.provide(
10-
ZLayer.succeed(ManagerConfig.default),
11-
ZLayer.succeed(GrpcConfig.default),
12-
PodsHealth.local, // just ping a pod to see if it's alive
13-
GrpcPods.live, // use gRPC protocol
14-
Storage.memory, // store data in memory
15-
ShardManager.live // Shard Manager logic
16-
)
9+
Server
10+
.run()
11+
.provide(
12+
ZLayer.succeed(ManagerConfig.default),
13+
ZLayer.succeed(GrpcConfig.default),
14+
PodsHealth.local, // just ping a pod to see if it's alive
15+
GrpcPods.live, // use gRPC protocol
16+
Storage.memory, // store data in memory
17+
ShardManager.live // Shard Manager logic
18+
)
1719
}

examples/src/test/scala/example/EndToEndSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.util.Try
2525
object EndToEndSpec extends ZIOSpecDefault {
2626

2727
val shardManagerServer: ZLayer[ShardManager with ManagerConfig, Throwable, Unit] =
28-
ZLayer(Server.run.forkDaemon *> ClockLive.sleep(3 seconds).unit)
28+
ZLayer(Server.run().forkDaemon *> ClockLive.sleep(3 seconds).unit)
2929

3030
val container: ZLayer[Any, Nothing, GenericContainer] =
3131
ZLayer.scoped {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package example
2+
3+
import com.devsisters.shardcake._
4+
import com.devsisters.shardcake.interfaces.{ Pods, Storage }
5+
import io.grpc.{ Metadata, Status }
6+
import scalapb.zio_grpc.{ ZClientInterceptor, ZTransform }
7+
import zio.test._
8+
import zio.{ Config => _, _ }
9+
10+
object GrpcAuthExampleSpec extends ZIOSpecDefault {
11+
12+
private val validAuthenticationKey = "validAuthenticationKey"
13+
14+
private val authKey = Metadata.Key.of("authentication-key", io.grpc.Metadata.ASCII_STRING_MARSHALLER)
15+
16+
private val config = ZLayer.succeed(Config.default.copy(simulateRemotePods = true))
17+
18+
private def grpcConfigLayer(clientAuthKey: String): ULayer[GrpcConfig] =
19+
ZLayer.succeed(
20+
GrpcConfig.default.copy(
21+
clientInterceptors = Seq(
22+
ZClientInterceptor.headersUpdater((_, _, md) => md.put(authKey, clientAuthKey).unit)
23+
),
24+
serverInterceptors = Seq(
25+
ZTransform { requestContext =>
26+
for {
27+
authenticated <- requestContext.metadata.get(authKey).map(_.contains(validAuthenticationKey))
28+
_ <- ZIO.when(!authenticated)(ZIO.fail(Status.UNAUTHENTICATED.asException))
29+
} yield requestContext
30+
}
31+
)
32+
)
33+
)
34+
35+
def spec: Spec[TestEnvironment with Scope, Any] =
36+
suite("GrpcAuthExampleSpec")(
37+
test("auth example for gRPC") {
38+
val podAddress = PodAddress("localhost", 54321)
39+
ZIO.scoped {
40+
for {
41+
_ <- Sharding.registerScoped
42+
podsClient <- ZIO.service[Pods]
43+
invalidPodsClient <- ZIO
44+
.service[Pods]
45+
.provide(
46+
grpcConfigLayer("invalid"),
47+
GrpcPods.live
48+
)
49+
validRequest <- podsClient.ping(podAddress).exit
50+
invalidRequest <- invalidPodsClient.ping(podAddress).exit
51+
} yield assertTrue(validRequest.isSuccess, invalidRequest.isFailure)
52+
}
53+
}
54+
).provide(
55+
ShardManagerClient.local,
56+
Storage.memory,
57+
config,
58+
grpcConfigLayer(validAuthenticationKey),
59+
Sharding.live,
60+
KryoSerialization.live,
61+
GrpcPods.live,
62+
GrpcShardingService.live
63+
)
64+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package example
2+
3+
import com.devsisters.shardcake.{ Config, ManagerConfig, Server, ShardManager, ShardManagerClient }
4+
import com.devsisters.shardcake.interfaces.{ Pods, PodsHealth, Storage }
5+
import sttp.client3.SttpBackend
6+
import sttp.client3.asynchttpclient.zio.AsyncHttpClientZioBackend
7+
import sttp.client3.httpclient.zio.ZioWebSocketsStreams
8+
import zio.Clock.ClockLive
9+
import zio.http.{ Header, Middleware }
10+
import zio.test._
11+
import zio.{ Config => _, _ }
12+
13+
object ShardManagerAuthExampleSpec extends ZIOSpecDefault {
14+
15+
val validToken = "validBearerToken"
16+
17+
val shardManagerServerLayer: ZLayer[ManagerConfig, Throwable, Unit] =
18+
ZLayer.makeSome[ManagerConfig, Unit](
19+
ZLayer(
20+
Server
21+
.run(Middleware.bearerAuthZIO(secret => ZIO.succeed(secret.stringValue.equals(validToken))))
22+
.forkDaemon *> ClockLive.sleep(3 seconds).unit
23+
),
24+
Storage.memory,
25+
ShardManager.live,
26+
Pods.noop,
27+
PodsHealth.noop
28+
)
29+
30+
def sttpBackendWithAuthTokenLayer(token: String): ZLayer[Scope, Throwable, SttpBackend[Task, ZioWebSocketsStreams]] =
31+
ZLayer {
32+
val authHeader = Header.Authorization.Bearer(token)
33+
AsyncHttpClientZioBackend.scoped(customizeRequest =
34+
builder => builder.addHeader(authHeader.headerName, authHeader.renderedValue)
35+
)
36+
}
37+
38+
def spec: Spec[TestEnvironment, Any] =
39+
suite("ShardManagerAuthSpec")(
40+
test("auth example for shard manager") {
41+
ZIO.scoped {
42+
for {
43+
validClient <- ZIO
44+
.service[ShardManagerClient]
45+
.provideSome[Config & Scope](
46+
sttpBackendWithAuthTokenLayer(validToken),
47+
ShardManagerClient.live
48+
)
49+
invalidClient <- ZIO
50+
.service[ShardManagerClient]
51+
.provideSome[Config & Scope](
52+
sttpBackendWithAuthTokenLayer("invalid"),
53+
ShardManagerClient.live
54+
)
55+
validRequest <- validClient.getAssignments.exit
56+
invalidRequest <- invalidClient.getAssignments.exit
57+
} yield assertTrue(validRequest.isSuccess, invalidRequest.isFailure)
58+
}
59+
}
60+
).provide(
61+
shardManagerServerLayer,
62+
ZLayer.succeed(Config.default),
63+
ZLayer.succeed(ManagerConfig.default)
64+
)
65+
}

manager/src/main/scala/com/devsisters/shardcake/Server.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,17 @@ object Server {
1010
/**
1111
* Start an HTTP server that exposes the Shard Manager GraphQL API
1212
*/
13-
val run: RIO[ShardManager with ManagerConfig, Nothing] =
13+
def run(
14+
httpHandler: HandlerAspect[Any, Unit] = HandlerAspect.identity
15+
): RIO[ShardManager with ManagerConfig, Nothing] =
1416
for {
1517
config <- ZIO.service[ManagerConfig]
1618
interpreter <- (GraphQLApi.api @@ printErrors).interpreter
1719
handlers = QuickAdapter(interpreter).handlers
1820
routes = Routes(
1921
Method.ANY / "health" -> Handler.ok,
20-
Method.ANY / "api" / "graphql" -> handlers.api,
21-
Method.ANY / "ws" / "graphql" -> handlers.webSocket
22+
Method.ANY / "api" / "graphql" -> handlers.api @@ httpHandler,
23+
Method.ANY / "ws" / "graphql" -> handlers.webSocket @@ httpHandler
2224
) @@ Middleware.cors
2325
_ <- ZIO.logInfo(s"Shard Manager server started on port ${config.apiPort}.")
2426
nothing <- ZServer
Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.devsisters.shardcake
22

33
import zio._
4+
import scalapb.zio_grpc.RequestContext
45
import scalapb.zio_grpc.ZClientInterceptor
6+
import scalapb.zio_grpc.ZTransform
57

68
import java.util.concurrent.Executor
79

@@ -11,16 +13,18 @@ import java.util.concurrent.Executor
1113
* @param maxInboundMessageSize the maximum message size allowed to be received by the grpc client
1214
* @param executor a custom executor to pass to grpc-java when creating gRPC clients and servers
1315
* @param shutdownTimeout the timeout to wait for the gRPC server to shutdown before forcefully shutting it down
14-
* @param interceptors the interceptors to be used by the gRPC client, e.g for adding tracing or logging
16+
* @param clientInterceptors the interceptors to be used by the gRPC client, e.g for adding tracing or logging
17+
* @param serverInterceptors the interceptors to be used by the gRPC Server, e.g for adding tracing or logging
1518
*/
1619
case class GrpcConfig(
1720
maxInboundMessageSize: Int,
1821
executor: Option[Executor],
1922
shutdownTimeout: Duration,
20-
interceptors: Seq[ZClientInterceptor]
23+
clientInterceptors: Seq[ZClientInterceptor],
24+
serverInterceptors: Seq[ZTransform[RequestContext, RequestContext]]
2125
)
2226

2327
object GrpcConfig {
2428
val default: GrpcConfig =
25-
GrpcConfig(maxInboundMessageSize = 32 * 1024 * 1024, None, 3.seconds, Seq.empty)
29+
GrpcConfig(maxInboundMessageSize = 32 * 1024 * 1024, None, 3.seconds, Seq.empty, Seq.empty)
2630
}

protocol-grpc/src/main/scala/com/devsisters/shardcake/GrpcPods.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class GrpcPods(
4141
}
4242
}
4343

44-
val channel = ZManagedChannel(builder, config.interceptors)
44+
val channel = ZManagedChannel(builder, config.clientInterceptors)
4545
// create a fiber that never ends and keeps the connection alive
4646
for {
4747
_ <- ZIO.logDebug(s"Opening connection to pod $pod")

protocol-grpc/src/main/scala/com/devsisters/shardcake/GrpcShardingService.scala

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -84,29 +84,38 @@ object GrpcShardingService {
8484
val live: ZLayer[Config with Sharding with GrpcConfig, Throwable, Unit] =
8585
ZLayer.scoped[Config with Sharding with GrpcConfig] {
8686
for {
87-
config <- ZIO.service[Config]
88-
grpcConfig <- ZIO.service[GrpcConfig]
89-
sharding <- ZIO.service[Sharding]
90-
builder = grpcConfig.executor match {
91-
case Some(executor) =>
92-
ServerBuilder
93-
.forPort(config.shardingPort)
94-
.executor(executor)
95-
case None =>
96-
ServerBuilder.forPort(config.shardingPort)
97-
}
98-
services <- ServiceList.add(new GrpcShardingService(sharding, config.sendTimeout) {}).bindAll
99-
server: Server = services
100-
.foldLeft(builder) { case (builder0, service) => builder0.addService(service) }
101-
.addService(ProtoReflectionService.newInstance())
102-
.build()
103-
_ <- ZIO.acquireRelease(ZIO.attempt(server.start()))(server =>
104-
ZIO.attemptBlocking {
105-
server.shutdown()
106-
server.awaitTermination(grpcConfig.shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)
107-
server.shutdownNow()
108-
}.ignore
109-
)
87+
config <- ZIO.service[Config]
88+
grpcConfig <- ZIO.service[GrpcConfig]
89+
sharding <- ZIO.service[Sharding]
90+
builder = grpcConfig.executor match {
91+
case Some(executor) =>
92+
ServerBuilder
93+
.forPort(config.shardingPort)
94+
.executor(executor)
95+
case None =>
96+
ServerBuilder.forPort(config.shardingPort)
97+
}
98+
grpcShardingService = new GrpcShardingService(sharding, config.sendTimeout) {}
99+
services <-
100+
ServiceList
101+
.add(
102+
grpcConfig.serverInterceptors
103+
.reduceOption(_.andThen(_))
104+
.map(t => grpcShardingService.transform(t))
105+
.getOrElse(grpcShardingService.asGeneric)
106+
)
107+
.bindAll
108+
server: Server = services
109+
.foldLeft(builder) { case (builder0, service) => builder0.addService(service) }
110+
.addService(ProtoReflectionService.newInstance())
111+
.build()
112+
_ <- ZIO.acquireRelease(ZIO.attempt(server.start()))(server =>
113+
ZIO.attemptBlocking {
114+
server.shutdown()
115+
server.awaitTermination(grpcConfig.shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)
116+
server.shutdownNow()
117+
}.ignore
118+
)
110119
} yield ()
111120
}
112121
}

0 commit comments

Comments
 (0)