Skip to content
7 changes: 5 additions & 2 deletions entities/src/main/scala/com/devsisters/shardcake/Config.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.devsisters.shardcake

import sttp.client3.UriContext
import sttp.model.Header
import sttp.model.Uri
import zio._

Expand Down Expand Up @@ -29,7 +30,8 @@ case class Config(
sendTimeout: Duration,
refreshAssignmentsRetryInterval: Duration,
unhealthyPodReportInterval: Duration,
simulateRemotePods: Boolean
simulateRemotePods: Boolean,
managerClientHeaderInterceptor: Seq[Header] => Seq[Header]
)

object Config {
Expand All @@ -44,6 +46,7 @@ object Config {
sendTimeout = 10 seconds,
refreshAssignmentsRetryInterval = 5 seconds,
unhealthyPodReportInterval = 5 seconds,
simulateRemotePods = false
simulateRemotePods = false,
managerClientHeaderInterceptor = headers => headers
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ object ShardManagerClient {
}

class ShardManagerClientLive(sttp: SttpBackend[Task, Any], config: Config) extends ShardManagerClient {
private def send[Origin: IsOperation, A](query: SelectionBuilder[Origin, A]): Task[A] =
sttp.send(query.toRequest(config.shardManagerUri)).map(_.body).absolve
private def send[Origin: IsOperation, A](query: SelectionBuilder[Origin, A]): Task[A] = {
val request = query.toRequest(config.shardManagerUri)
sttp.send(request.copy(headers = config.managerClientHeaderInterceptor(request.headers))).map(_.body).absolve
}

def register(podAddress: PodAddress): Task[Unit] =
send(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.devsisters.shardcake

import zio._
import zio.http.HandlerAspect

/**
* Shard Manager configuration
Expand All @@ -23,7 +24,8 @@ case class ManagerConfig(
persistRetryInterval: Duration,
persistRetryCount: Int,
rebalanceRate: Double,
podHealthCheckInterval: Duration
podHealthCheckInterval: Duration,
httpHandler: HandlerAspect[Any, Unit]
)

object ManagerConfig {
Expand All @@ -37,6 +39,7 @@ object ManagerConfig {
persistRetryInterval = 3 seconds,
persistRetryCount = 100,
rebalanceRate = 2 / 100d,
podHealthCheckInterval = 1 minute
podHealthCheckInterval = 1 minute,
httpHandler = HandlerAspect.identity
)
}
4 changes: 2 additions & 2 deletions manager/src/main/scala/com/devsisters/shardcake/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ object Server {
handlers = QuickAdapter(interpreter).handlers
routes = Routes(
Method.ANY / "health" -> Handler.ok,
Method.ANY / "api" / "graphql" -> handlers.api,
Method.ANY / "ws" / "graphql" -> handlers.webSocket
Method.ANY / "api" / "graphql" -> handlers.api @@ config.httpHandler,
Method.ANY / "ws" / "graphql" -> handlers.webSocket @@ config.httpHandler
) @@ Middleware.cors
_ <- ZIO.logInfo(s"Shard Manager server started on port ${config.apiPort}.")
nothing <- ZServer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.devsisters.shardcake

import zio._
import io.grpc.StatusException
import scalapb.zio_grpc.GTransform
import scalapb.zio_grpc.ZClientInterceptor

import java.util.concurrent.Executor
import scalapb.zio_grpc.RequestContext

/**
* The configuration for the gRPC client.
Expand All @@ -17,10 +20,11 @@ case class GrpcConfig(
maxInboundMessageSize: Int,
executor: Option[Executor],
shutdownTimeout: Duration,
interceptors: Seq[ZClientInterceptor]
interceptors: Seq[ZClientInterceptor],
serviceTransform: GTransform[RequestContext, StatusException, RequestContext, StatusException]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok, how about renaming interceptors to clientInterceptors and serviceTransform to serverInterceptors?

Even though identity (and similarly the empty aspect) does nothing, a list might be a little better? So that we don't call anything additional when we don't need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have renamed the var names but I have not updated how the GrpcShardingService adds the serviceTransform to the service. I will work on this tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code to add the serverInterceptors. I will work on adding some tests next.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ghostdogpr I have added some tests for adding metadata to shardcake's internal calls. I put them in the example subproject because it had access to all of the code that I needed but I am not sure if that is the right place for them.

Can you review these changes again? Also, is there any documentation that I need to update for this change?

)

object GrpcConfig {
val default: GrpcConfig =
GrpcConfig(maxInboundMessageSize = 32 * 1024 * 1024, None, 3.seconds, Seq.empty)
GrpcConfig(maxInboundMessageSize = 32 * 1024 * 1024, None, 3.seconds, Seq.empty, GTransform.identity)
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,10 @@ object GrpcShardingService {
case None =>
ServerBuilder.forPort(config.shardingPort)
}
services <- ServiceList.add(new GrpcShardingService(sharding, config.sendTimeout) {}).bindAll
services <-
ServiceList
.add(new GrpcShardingService(sharding, config.sendTimeout) {}.transform(grpcConfig.serviceTransform))
.bindAll
server: Server = services
.foldLeft(builder) { case (builder0, service) => builder0.addService(service) }
.addService(ProtoReflectionService.newInstance())
Expand Down