Skip to content

Commit 080d955

Browse files
committed
Review feedback 2
1 parent e8598d6 commit 080d955

File tree

3 files changed

+38
-34
lines changed

3 files changed

+38
-34
lines changed

pkg/grpc/reflection_relay.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import (
44
"context"
55
"maps"
66

7-
"github.com/buildbarn/bb-storage/pkg/program"
87
grpcpb "github.com/buildbarn/bb-storage/pkg/proto/configuration/grpc"
9-
"github.com/buildbarn/bb-storage/pkg/util"
108
"github.com/jhump/protoreflect/v2/grpcreflect"
119
"github.com/jhump/protoreflect/v2/protoresolve"
1210
"google.golang.org/grpc/reflection/grpc_reflection_v1"
@@ -25,20 +23,26 @@ var _ reflection.ServiceInfoProvider = (*combinedServiceInfoProvider)(nil)
2523
// GetServiceInfo returns the currently available services, which might have
2624
// changed since the creation of this reflection server.
2725
func (p *combinedServiceInfoProvider) GetServiceInfo() map[string]grpc.ServiceInfo {
28-
services := make(map[string]grpc.ServiceInfo)
26+
serverServiceInfo := p.server.GetServiceInfo()
27+
services := make(map[string]grpc.ServiceInfo, len(p.extraServices)+len(serverServiceInfo))
2928
maps.Copy(services, p.extraServices)
30-
maps.Copy(services, p.server.GetServiceInfo())
29+
maps.Copy(services, serverServiceInfo)
3130
return services
3231
}
3332

34-
// registerReflection registers the google.golang.org/grpc/reflection/ service
35-
// on a grpc.Server and calls remote backends in case for relayed services. The
36-
// connections to the backend will run with the backendCtx.
37-
func registerReflection(backendCtx context.Context, s *grpc.Server, serverRelayConfiguration []*grpcpb.ServerRelayConfiguration, group program.Group, grpcClientFactory ClientFactory) error {
33+
type serverRelayConfigWithGrpcClient struct {
34+
config *grpcpb.ServerRelayConfiguration
35+
grpcClient grpc.ClientConnInterface
36+
}
37+
38+
// registerReflectionServer registers the google.golang.org/grpc/reflection/
39+
// service on a grpc.Server and calls remote backends in case for relayed
40+
// services. The connections to the backend will run with the backendCtx.
41+
func registerReflectionServer(backendCtx context.Context, s *grpc.Server, serverRelayConfigurations []serverRelayConfigWithGrpcClient) error {
3842
// Accumulate all the service names.
3943
relayServices := make(map[string]grpc.ServiceInfo)
40-
for _, relay := range serverRelayConfiguration {
41-
for _, service := range relay.GetServices() {
44+
for _, relay := range serverRelayConfigurations {
45+
for _, service := range relay.config.GetServices() {
4246
// According to ServiceInfoProvider docs for ServerOptions.Services,
4347
// the reflection service is only interested in the service names.
4448
relayServices[service] = grpc.ServiceInfo{}
@@ -47,12 +51,8 @@ func registerReflection(backendCtx context.Context, s *grpc.Server, serverRelayC
4751

4852
// Make a combined descriptor and extension resolver.
4953
reflectionBackends := []protoresolve.Resolver{}
50-
for relayIdx, relay := range serverRelayConfiguration {
51-
grpcClient, err := grpcClientFactory.NewClientFromConfiguration(relay.Endpoint, group)
52-
if err != nil {
53-
return util.StatusWrapf(err, "Failed to create relay RPC client %d", relayIdx+1)
54-
}
55-
resolver := grpcreflect.NewClientAuto(backendCtx, grpcClient).AsResolver()
54+
for _, relay := range serverRelayConfigurations {
55+
resolver := grpcreflect.NewClientAuto(backendCtx, relay.grpcClient).AsResolver()
5656
reflectionBackends = append(reflectionBackends, resolver)
5757
}
5858
combinedRemoteResolver := protoresolve.Combine(reflectionBackends...)

pkg/grpc/routing_stream_handler.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@ import (
1717
func NewRoutingStreamHandler(routeTable map[string]grpc.StreamHandler) grpc.StreamHandler {
1818
return func(srv any, stream grpc.ServerStream) error {
1919
// All gRPC invocations has a grpc.ServerTransportStream context.
20-
serviceMethod := grpc.ServerTransportStreamFromContext(stream.Context()).Method()
20+
orgServiceMethod := grpc.ServerTransportStreamFromContext(stream.Context()).Method()
2121
// Service and method name parsing based on grpc.Server.handleStream().
22-
startIdx := 0
23-
if serviceMethod != "" && serviceMethod[0] == '/' {
24-
startIdx = 1
25-
}
22+
serviceMethod := strings.TrimPrefix(orgServiceMethod, "/")
2623
endIdx := strings.LastIndex(serviceMethod, "/")
27-
if endIdx <= startIdx {
28-
return status.Errorf(codes.InvalidArgument, "Malformed method name %v", serviceMethod)
24+
if endIdx == -1 {
25+
return status.Errorf(codes.InvalidArgument, "Malformed method name %v", orgServiceMethod)
2926
}
30-
service := serviceMethod[startIdx:endIdx]
27+
service := serviceMethod[:endIdx]
3128

3229
if streamHandler, ok := routeTable[service]; ok {
3330
return streamHandler(srv, stream)

pkg/grpc/server.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,19 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC
146146
}))
147147
}
148148

149+
relayConfigWithGrpcClients := make([]serverRelayConfigWithGrpcClient, len(configuration.Relays))
150+
for relayIdx, relay := range configuration.Relays {
151+
grpcClient, err := grpcClientFactory.NewClientFromConfiguration(relay.Endpoint, group)
152+
if err != nil {
153+
return util.StatusWrapf(err, "Failed to create relay RPC client %d", relayIdx+1)
154+
}
155+
relayConfigWithGrpcClients[relayIdx] = serverRelayConfigWithGrpcClient{
156+
config: relay,
157+
grpcClient: grpcClient,
158+
}
159+
}
149160
if len(configuration.Relays) != 0 {
150-
handler, err := newRoutingStreamHandlerFromConfiguration(configuration.Relays, grpcClientFactory, group)
161+
handler, err := newRoutingStreamHandlerFromConfiguration(relayConfigWithGrpcClients)
151162
if err != nil {
152163
return err
153164
}
@@ -169,7 +180,7 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC
169180

170181
// Enable default services.
171182
grpc_prometheus.Register(s)
172-
if err := registerReflection(context.Background(), s, configuration.Relays, group, grpcClientFactory); err != nil {
183+
if err := registerReflectionServer(context.Background(), s, relayConfigWithGrpcClients); err != nil {
173184
return util.StatusWrap(err, "Failed to create reflection service")
174185
}
175186
h := health.NewServer()
@@ -218,15 +229,11 @@ func NewServersFromConfigurationAndServe(configurations []*configuration.ServerC
218229
return nil
219230
}
220231

221-
func newRoutingStreamHandlerFromConfiguration(serverRelayConfiguration []*grpcpb.ServerRelayConfiguration, grpcClientFactory ClientFactory, group program.Group) (grpc.StreamHandler, error) {
232+
func newRoutingStreamHandlerFromConfiguration(serverRelayConfigurations []serverRelayConfigWithGrpcClient) (grpc.StreamHandler, error) {
222233
routeTable := make(map[string]grpc.StreamHandler)
223-
for i, relay := range serverRelayConfiguration {
224-
grpcClient, err := grpcClientFactory.NewClientFromConfiguration(relay.GetEndpoint(), group)
225-
if err != nil {
226-
return nil, util.StatusWrapf(err, "Failed to create gRPC relay RPC client at index %d", i)
227-
}
228-
handler := NewForwardingStreamHandler(grpcClient)
229-
for _, service := range relay.GetServices() {
234+
for _, relay := range serverRelayConfigurations {
235+
handler := NewForwardingStreamHandler(relay.grpcClient)
236+
for _, service := range relay.config.GetServices() {
230237
if _, ok := routeTable[service]; ok {
231238
return nil, status.Errorf(codes.InvalidArgument, "Duplicated gRPC relay for %v", service)
232239
}

0 commit comments

Comments
 (0)