Skip to content
7 changes: 5 additions & 2 deletions entities/src/main/scala/com/devsisters/shardcake/Config.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.devsisters.shardcake

import sttp.client3.UriContext
import sttp.model.Header
import sttp.model.Uri
import zio._

Expand Down Expand Up @@ -29,7 +30,8 @@ case class Config(
sendTimeout: Duration,
refreshAssignmentsRetryInterval: Duration,
unhealthyPodReportInterval: Duration,
simulateRemotePods: Boolean
simulateRemotePods: Boolean,
managerClientHeaderInterceptor: Seq[Header] => Seq[Header]
)

object Config {
Expand All @@ -44,6 +46,7 @@ object Config {
sendTimeout = 10 seconds,
refreshAssignmentsRetryInterval = 5 seconds,
unhealthyPodReportInterval = 5 seconds,
simulateRemotePods = false
simulateRemotePods = false,
managerClientHeaderInterceptor = headers => headers
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ object ShardManagerClient {
}

class ShardManagerClientLive(sttp: SttpBackend[Task, Any], config: Config) extends ShardManagerClient {
private def send[Origin: IsOperation, A](query: SelectionBuilder[Origin, A]): Task[A] =
sttp.send(query.toRequest(config.shardManagerUri)).map(_.body).absolve
private def send[Origin: IsOperation, A](query: SelectionBuilder[Origin, A]): Task[A] = {
val request = query.toRequest(config.shardManagerUri)
sttp.send(request.copy(headers = config.managerClientHeaderInterceptor(request.headers))).map(_.body).absolve
}

def register(podAddress: PodAddress): Task[Unit] =
send(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.devsisters.shardcake

import zio._
import zio.http.HandlerAspect

/**
* Shard Manager configuration
Expand All @@ -23,7 +24,8 @@ case class ManagerConfig(
persistRetryInterval: Duration,
persistRetryCount: Int,
rebalanceRate: Double,
podHealthCheckInterval: Duration
podHealthCheckInterval: Duration,
httpHandler: HandlerAspect[Any, Unit]
)

object ManagerConfig {
Expand All @@ -37,6 +39,7 @@ object ManagerConfig {
persistRetryInterval = 3 seconds,
persistRetryCount = 100,
rebalanceRate = 2 / 100d,
podHealthCheckInterval = 1 minute
podHealthCheckInterval = 1 minute,
httpHandler = HandlerAspect.identity
)
}
4 changes: 2 additions & 2 deletions manager/src/main/scala/com/devsisters/shardcake/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ object Server {
handlers = QuickAdapter(interpreter).handlers
routes = Routes(
Method.ANY / "health" -> Handler.ok,
Method.ANY / "api" / "graphql" -> handlers.api,
Method.ANY / "ws" / "graphql" -> handlers.webSocket
Method.ANY / "api" / "graphql" -> handlers.api @@ config.httpHandler,
Method.ANY / "ws" / "graphql" -> handlers.webSocket @@ config.httpHandler
) @@ Middleware.cors
_ <- ZIO.logInfo(s"Shard Manager server started on port ${config.apiPort}.")
nothing <- ZServer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.devsisters.shardcake

import zio._
import io.grpc.StatusException
import scalapb.zio_grpc.GTransform
import scalapb.zio_grpc.ZClientInterceptor

import java.util.concurrent.Executor
import scalapb.zio_grpc.RequestContext

/**
* The configuration for the gRPC client.
Expand All @@ -17,10 +20,11 @@ case class GrpcConfig(
maxInboundMessageSize: Int,
executor: Option[Executor],
shutdownTimeout: Duration,
interceptors: Seq[ZClientInterceptor]
interceptors: Seq[ZClientInterceptor],
servicetransform: GTransform[RequestContext, StatusException, RequestContext, StatusException]
)

object GrpcConfig {
val default: GrpcConfig =
GrpcConfig(maxInboundMessageSize = 32 * 1024 * 1024, None, 3.seconds, Seq.empty)
GrpcConfig(maxInboundMessageSize = 32 * 1024 * 1024, None, 3.seconds, Seq.empty, GTransform.identity)
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ object GrpcShardingService {
case None =>
ServerBuilder.forPort(config.shardingPort)
}
services <- ServiceList.add(new GrpcShardingService(sharding, config.sendTimeout) {}).bindAll
services <-
ServiceList
.add(new GrpcShardingService(sharding, config.sendTimeout) {}.transform(grpcConfig.servicetransform))
.bindAll
server: Server = services
.foldLeft(builder) { case (builder0, service) => builder0.addService(service) }
.addService(ProtoReflectionService.newInstance())
Expand Down