Skip to content

Commit dbaf6e8

Browse files
committed
Add generic gRPC stream forwarding
To be able to pass Bazel's build event stream (BES) to the same DNS name, without having to add an extra L7 router in front of the bb-storage frontend, add a configuration to forward specific gRPC methods to other backends. No authorization is possible on the passed through messages because Buildbarn has no knowledge about the semantics of the forwarded messages. The gRPC reflection service has also been extended to forward requests that cannot be resolved locally.
1 parent 2d6050f commit dbaf6e8

File tree

10 files changed

+395
-63
lines changed

10 files changed

+395
-63
lines changed

MODULE.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use_repo(
5757
"com_github_gorilla_mux",
5858
"com_github_grpc_ecosystem_go_grpc_middleware",
5959
"com_github_grpc_ecosystem_go_grpc_prometheus",
60+
"com_github_jhump_protoreflect_v2",
6061
"com_github_jmespath_go_jmespath",
6162
"com_github_klauspost_compress",
6263
"com_github_lazybeaver_xorshift",

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ require (
2828
github.com/gorilla/mux v1.8.1
2929
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
3030
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
31+
github.com/jhump/protoreflect/v2 v2.0.0-beta.2
3132
github.com/jmespath/go-jmespath v0.4.0
3233
github.com/klauspost/compress v1.18.1
3334
github.com/lazybeaver/xorshift v0.0.0-20170702203709-ce511d4823dd

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy
198198
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
199199
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg=
200200
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4=
201+
github.com/jhump/protoreflect/v2 v2.0.0-beta.2 h1:qZU+rEZUOYTz1Bnhi3xbwn+VxdXkLVeEpAeZzVXLY88=
202+
github.com/jhump/protoreflect/v2 v2.0.0-beta.2/go.mod h1:4tnOYkB/mq7QTyS3YKtVtNrJv4Psqout8HA1U+hZtgM=
201203
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
202204
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
203205
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=

pkg/grpc/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@ go_library(
2626
"peer_transport_credentials_linux.go",
2727
"proto_trace_attributes_extractor.go",
2828
"proxy_dialer.go",
29+
"reflection_relay.go",
2930
"request_headers_authenticator.go",
3031
"request_metadata_tracing_interceptor.go",
32+
"routing_stream_forwarder.go",
3133
"server.go",
34+
"single_stream_forwarder.go",
3235
"tls_client_certificate_authenticator.go",
3336
],
3437
importpath = "github.com/buildbarn/bb-storage/pkg/grpc",
@@ -49,6 +52,8 @@ go_library(
4952
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
5053
"@com_github_grpc_ecosystem_go_grpc_middleware//:go-grpc-middleware",
5154
"@com_github_grpc_ecosystem_go_grpc_prometheus//:go-grpc-prometheus",
55+
"@com_github_jhump_protoreflect_v2//grpcreflect",
56+
"@com_github_jhump_protoreflect_v2//protoresolve",
5257
"@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:otelgrpc",
5358
"@io_opentelemetry_go_otel//attribute",
5459
"@io_opentelemetry_go_otel_trace//:trace",
@@ -63,11 +68,15 @@ go_library(
6368
"@org_golang_google_grpc//metadata",
6469
"@org_golang_google_grpc//peer",
6570
"@org_golang_google_grpc//reflection",
71+
"@org_golang_google_grpc//reflection/grpc_reflection_v1",
72+
"@org_golang_google_grpc//reflection/grpc_reflection_v1alpha",
6673
"@org_golang_google_grpc//status",
6774
"@org_golang_google_grpc_security_advancedtls//:advancedtls",
6875
"@org_golang_google_protobuf//encoding/prototext",
6976
"@org_golang_google_protobuf//proto",
7077
"@org_golang_google_protobuf//reflect/protoreflect",
78+
"@org_golang_google_protobuf//types/known/emptypb",
79+
"@org_golang_x_sync//errgroup",
7180
"@org_golang_x_sync//semaphore",
7281
] + select({
7382
"@rules_go//go/platform:android": [

pkg/grpc/reflection_relay.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package grpc
2+
3+
import (
4+
"context"
5+
"maps"
6+
"strings"
7+
8+
"github.com/buildbarn/bb-storage/pkg/program"
9+
grpcpb "github.com/buildbarn/bb-storage/pkg/proto/configuration/grpc"
10+
"github.com/buildbarn/bb-storage/pkg/util"
11+
"github.com/jhump/protoreflect/v2/grpcreflect"
12+
"github.com/jhump/protoreflect/v2/protoresolve"
13+
v1reflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1"
14+
v1alphareflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
15+
16+
"google.golang.org/grpc"
17+
"google.golang.org/grpc/codes"
18+
"google.golang.org/grpc/reflection"
19+
"google.golang.org/grpc/status"
20+
)
21+
22+
type combinedServiceInfoProvider struct {
23+
server reflection.ServiceInfoProvider
24+
extraServices map[string]grpc.ServiceInfo
25+
}
26+
27+
var _ reflection.ServiceInfoProvider = (*combinedServiceInfoProvider)(nil)
28+
29+
// GetServiceInfo returns the currently available services, which might have
30+
// changed since the creation of this reflection server.
31+
func (p *combinedServiceInfoProvider) GetServiceInfo() map[string]grpc.ServiceInfo {
32+
services := make(map[string]grpc.ServiceInfo)
33+
maps.Copy(services, p.extraServices)
34+
maps.Copy(services, p.server.GetServiceInfo())
35+
return services
36+
}
37+
38+
// registerReflection registers the google.golang.org/grpc/reflection/ service
39+
// on a grpc.Server and calls remote backends in case for relayed services. The
40+
// connections to the backend will run with the backendCtx.
41+
func registerReflection(backendCtx context.Context, s *grpc.Server, serverRelayConfiguration []*grpcpb.ServerRelayConfiguration, group program.Group, grpcClientFactory ClientFactory) error {
42+
// Accumulate all the service names.
43+
relayServices := make(map[string]grpc.ServiceInfo)
44+
for _, relay := range serverRelayConfiguration {
45+
for _, serviceMethod := range relay.Methods {
46+
if !strings.HasPrefix(serviceMethod, "/") {
47+
return status.Errorf(codes.InvalidArgument, "Malformed service method name %q, should start with '/'", serviceMethod)
48+
}
49+
pos := strings.LastIndex(serviceMethod, "/")
50+
if pos == -1 || pos == 0 {
51+
return status.Errorf(codes.InvalidArgument, "Malformed name %q, expected '/' between service and method", serviceMethod)
52+
}
53+
serviceName := serviceMethod[1:pos]
54+
// According to ServiceInfoProvider docs for ServerOptions.Services,
55+
// the reflection service is only interested in the service names.
56+
relayServices[serviceName] = grpc.ServiceInfo{}
57+
}
58+
}
59+
60+
// Make a combined descriptor and extension resolver.
61+
reflectionBackends := []protoresolve.Resolver{}
62+
for relayIdx, relay := range serverRelayConfiguration {
63+
grpcClient, err := grpcClientFactory.NewClientFromConfiguration(relay.Endpoint, group)
64+
if err != nil {
65+
return util.StatusWrapf(err, "Failed to create relay RPC client %d", relayIdx+1)
66+
}
67+
resolver := grpcreflect.NewClientAuto(backendCtx, grpcClient).AsResolver()
68+
reflectionBackends = append(reflectionBackends, resolver)
69+
}
70+
combinedRemoteResolver := protoresolve.Combine(reflectionBackends...)
71+
72+
serverOptions := reflection.ServerOptions{
73+
Services: &combinedServiceInfoProvider{
74+
server: s,
75+
extraServices: relayServices,
76+
},
77+
DescriptorResolver: combinedRemoteResolver,
78+
ExtensionResolver: protoresolve.TypesFromDescriptorPool(combinedRemoteResolver),
79+
}
80+
v1reflectiongrpc.RegisterServerReflectionServer(s, reflection.NewServerV1(serverOptions))
81+
v1alphareflectiongrpc.RegisterServerReflectionServer(s, reflection.NewServer(serverOptions))
82+
return nil
83+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package grpc
2+
3+
import (
4+
"fmt"
5+
6+
"google.golang.org/grpc"
7+
"google.golang.org/grpc/codes"
8+
"google.golang.org/grpc/status"
9+
)
10+
11+
type RoutingStreamForwarder struct {
12+
// A mapping from the full service and method name to forward, to the
13+
// GrpcStreamHandler to be called.
14+
RouteTable map[string]GrpcStreamHandler
15+
}
16+
17+
func NewRoutingStreamForwarder() *RoutingStreamForwarder {
18+
return &RoutingStreamForwarder{
19+
RouteTable: make(map[string]GrpcStreamHandler),
20+
}
21+
}
22+
23+
func (s *RoutingStreamForwarder) HandleStream(method string, stream grpc.ServerStream) error {
24+
if streamHandler, ok := s.RouteTable[method]; ok {
25+
return streamHandler.HandleStream(method, stream)
26+
}
27+
errDesc := fmt.Sprintf("no route for method %v", method)
28+
return status.Error(codes.Unimplemented, errDesc)
29+
}

pkg/grpc/server.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
configuration "github.com/buildbarn/bb-storage/pkg/proto/configuration/grpc"
1010
grpcpb "github.com/buildbarn/bb-storage/pkg/proto/configuration/grpc"
1111
"github.com/buildbarn/bb-storage/pkg/util"
12-
"github.com/grpc-ecosystem/go-grpc-prometheus"
12+
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
1313

1414
"google.golang.org/grpc"
1515
"google.golang.org/grpc/codes"
@@ -18,7 +18,6 @@ import (
1818
"google.golang.org/grpc/health"
1919
"google.golang.org/grpc/health/grpc_health_v1"
2020
"google.golang.org/grpc/keepalive"
21-
"google.golang.org/grpc/reflection"
2221
"google.golang.org/grpc/status"
2322

2423
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@@ -147,6 +146,18 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC
147146
}))
148147
}
149148

149+
if len(configuration.Relays) != 0 {
150+
handler, err := newStreamRoutingFromConfiguration(configuration.Relays, grpcClientFactory, group)
151+
if err != nil {
152+
return util.StatusWrap(err, "Failed to create authenticator RPC client")
153+
}
154+
serverOptions = append(serverOptions, grpc.UnknownServiceHandler(func(srv any, stream grpc.ServerStream) error {
155+
transportStream := grpc.ServerTransportStreamFromContext(stream.Context())
156+
method := transportStream.Method()
157+
return handler.HandleStream(method, stream)
158+
}))
159+
}
160+
150161
// Create server.
151162
s := grpc.NewServer(serverOptions...)
152163
stopFunc := s.Stop
@@ -162,7 +173,9 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC
162173

163174
// Enable default services.
164175
grpc_prometheus.Register(s)
165-
reflection.Register(s)
176+
if err := registerReflection(context.Background(), s, configuration.Relays, group, grpcClientFactory); err != nil {
177+
return util.StatusWrap(err, "Failed to create reflection service")
178+
}
166179
h := health.NewServer()
167180
grpc_health_v1.RegisterHealthServer(s, h)
168181
// TODO: Construct an API for the caller to indicate
@@ -208,3 +221,22 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC
208221
}
209222
return nil
210223
}
224+
225+
func newStreamRoutingFromConfiguration(serverRelayConfiguration []*grpcpb.ServerRelayConfiguration, grpcClientFactory ClientFactory, group program.Group) (GrpcStreamHandler, error) {
226+
handler := NewRoutingStreamForwarder()
227+
for _, relay := range serverRelayConfiguration {
228+
grpcClient, err := grpcClientFactory.NewClientFromConfiguration(relay.GetEndpoint(), group)
229+
if err != nil {
230+
return nil, util.StatusWrap(err, "Failed to create authenticator RPC client")
231+
}
232+
for _, method := range relay.GetMethods() {
233+
if _, ok := handler.RouteTable[method]; ok {
234+
return nil, status.Errorf(codes.InvalidArgument, "Duplicated relay for %v", method)
235+
}
236+
handler.RouteTable[method] = NewSingleStreamForwarder(
237+
grpcClient,
238+
)
239+
}
240+
}
241+
return handler, nil
242+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package grpc
2+
3+
import (
4+
"io"
5+
6+
"golang.org/x/sync/errgroup"
7+
"google.golang.org/grpc"
8+
"google.golang.org/protobuf/types/known/emptypb"
9+
)
10+
11+
type GrpcStreamHandler interface {
12+
// Process the gRPC stream with the given service and method name.
13+
HandleStream(method string, stream grpc.ServerStream) error
14+
}
15+
16+
func NewSingleStreamForwarder(client grpc.ClientConnInterface) GrpcStreamHandler {
17+
return &singleStreamForwarder{
18+
backend: client,
19+
}
20+
}
21+
22+
type singleStreamForwarder struct {
23+
backend grpc.ClientConnInterface
24+
}
25+
26+
func (s *singleStreamForwarder) HandleStream(method string, incomingStream grpc.ServerStream) error {
27+
desc := grpc.StreamDesc{
28+
// StreamName and Handler are only used when registering handlers on a
29+
// server.
30+
StreamName: "",
31+
Handler: nil,
32+
// Streaming behaviour is wanted, single message is treated the same on
33+
// transport level, the application just closes the stream after the
34+
// first message.
35+
ServerStreams: true,
36+
ClientStreams: true,
37+
}
38+
group, groupCtx := errgroup.WithContext(incomingStream.Context())
39+
outgoingStream, err := s.backend.NewStream(groupCtx, &desc, method)
40+
if err != nil {
41+
return err
42+
}
43+
go func() {
44+
for {
45+
msg := &emptypb.Empty{}
46+
if err := incomingStream.RecvMsg(msg); err != nil {
47+
if err == io.EOF {
48+
// Let's to receive on outgoingStream, so don't cancel
49+
// grouptCtx.
50+
outgoingStream.CloseSend()
51+
return
52+
}
53+
// Cancel groupCtx immediately.
54+
group.Go(func() error { return err })
55+
return
56+
}
57+
if err := outgoingStream.SendMsg(msg); err != nil {
58+
if err == io.EOF {
59+
// The error will be returned by outgoingStream.RecvMsg(),
60+
// no need to cancel groupCtx now.
61+
return
62+
}
63+
// Cancel groupCtx immediately.
64+
group.Go(func() error { return err })
65+
return
66+
}
67+
}
68+
}()
69+
group.Go(func() error {
70+
for {
71+
msg := &emptypb.Empty{}
72+
if err := outgoingStream.RecvMsg(msg); err != nil {
73+
if err == io.EOF {
74+
return nil
75+
}
76+
return err
77+
}
78+
if err := incomingStream.SendMsg(msg); err != nil {
79+
return err
80+
}
81+
}
82+
})
83+
// group.Wait() may block a bit on incomingStream.SendMsg(), but that
84+
// shouldn't be for too long.
85+
return group.Wait()
86+
}

0 commit comments

Comments
 (0)