Skip to content

Commit e8598d6

Browse files
committed
Review feedback
1 parent 68d8e54 commit e8598d6

File tree

6 files changed

+19
-28
lines changed

6 files changed

+19
-28
lines changed

pkg/grpc/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ go_library(
3232
"request_metadata_tracing_interceptor.go",
3333
"routing_stream_handler.go",
3434
"server.go",
35-
"server_transport_stream_context.go",
3635
"tls_client_certificate_authenticator.go",
3736
],
3837
importpath = "github.com/buildbarn/bb-storage/pkg/grpc",
@@ -70,7 +69,6 @@ go_library(
7069
"@org_golang_google_grpc//peer",
7170
"@org_golang_google_grpc//reflection",
7271
"@org_golang_google_grpc//reflection/grpc_reflection_v1",
73-
"@org_golang_google_grpc//reflection/grpc_reflection_v1alpha",
7472
"@org_golang_google_grpc//status",
7573
"@org_golang_google_grpc_security_advancedtls//:advancedtls",
7674
"@org_golang_google_protobuf//encoding/prototext",

pkg/grpc/forwarding_stream_handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ type forwardingStreamHandler struct {
2525
// incomingStream are forwarded to the backend stream and responses from the
2626
// backend stream are sent back in the incomingStream.
2727
func (s *forwardingStreamHandler) HandleStream(srv any, incomingStream grpc.ServerStream) error {
28-
method := MustStreamMethodFromContext(incomingStream.Context())
28+
// All gRPC invocations has a grpc.ServerTransportStream context.
29+
method := grpc.ServerTransportStreamFromContext(incomingStream.Context()).Method()
2930
desc := grpc.StreamDesc{
3031
// According to grpc.StreamDesc documentation, StreamName and Handler
3132
// are only used when registering handlers on a server.

pkg/grpc/forwarding_stream_handler_test.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,14 @@ func TestSimpleStreamForwarder(t *testing.T) {
6666
forwarder := bb_grpc.NewForwardingStreamHandler(backend)
6767
serverTransportStream := mock.NewMockServerTransportStream(ctrl)
6868
serverTransportStream.EXPECT().Method().Return("/serviceA/method1").AnyTimes()
69-
incomingStream := mock.NewMockServerStream(ctrl)
70-
outgoingStream := mock.NewMockClientStream(ctrl)
7169

7270
t.Run("RequestSuccess", func(t *testing.T) {
7371
synctest.Test(t, func(t *testing.T) {
7472
var outgoingStreamCtx context.Context
7573
outgoingRecvBarrier := make(chan struct{})
7674
incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream)
75+
incomingStream := mock.NewMockServerStream(ctrl)
76+
outgoingStream := mock.NewMockClientStream(ctrl)
7777

7878
newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn(
7979
func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
@@ -114,6 +114,8 @@ func TestSimpleStreamForwarder(t *testing.T) {
114114
synctest.Test(t, func(t *testing.T) {
115115
var outgoingStreamCtx context.Context
116116
incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream)
117+
incomingStream := mock.NewMockServerStream(ctrl)
118+
outgoingStream := mock.NewMockClientStream(ctrl)
117119

118120
newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn(
119121
func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
@@ -145,6 +147,8 @@ func TestSimpleStreamForwarder(t *testing.T) {
145147
synctest.Test(t, func(t *testing.T) {
146148
var outgoingStreamCtx context.Context
147149
incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream)
150+
incomingStream := mock.NewMockServerStream(ctrl)
151+
outgoingStream := mock.NewMockClientStream(ctrl)
148152

149153
newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn(
150154
func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
@@ -178,6 +182,8 @@ func TestSimpleStreamForwarder(t *testing.T) {
178182
var outgoingStreamCtx context.Context
179183
incomingRecvBarrier := make(chan struct{})
180184
incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream)
185+
incomingStream := mock.NewMockServerStream(ctrl)
186+
outgoingStream := mock.NewMockClientStream(ctrl)
181187

182188
newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn(
183189
func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
@@ -216,6 +222,8 @@ func TestSimpleStreamForwarder(t *testing.T) {
216222
var outgoingStreamCtx context.Context
217223
incomingRecvBarrier := make(chan struct{})
218224
incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream)
225+
incomingStream := mock.NewMockServerStream(ctrl)
226+
outgoingStream := mock.NewMockClientStream(ctrl)
219227

220228
newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn(
221229
func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
@@ -249,6 +257,8 @@ func TestSimpleStreamForwarder(t *testing.T) {
249257
synctest.Test(t, func(t *testing.T) {
250258
var outgoingStreamCtx context.Context
251259
incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream)
260+
incomingStream := mock.NewMockServerStream(ctrl)
261+
outgoingStream := mock.NewMockClientStream(ctrl)
252262

253263
newStreamCall := backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").DoAndReturn(
254264
func(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
@@ -280,6 +290,7 @@ func TestSimpleStreamForwarder(t *testing.T) {
280290
t.Run("NewStreamError", func(t *testing.T) {
281291
synctest.Test(t, func(t *testing.T) {
282292
incomingStreamCtx := grpc.NewContextWithServerTransportStream(context.Background(), serverTransportStream)
293+
incomingStream := mock.NewMockServerStream(ctrl)
283294

284295
incomingStream.EXPECT().Context().Return(incomingStreamCtx).AnyTimes()
285296
backend.EXPECT().NewStream(gomock.Any(), gomock.Any(), "/serviceA/method1").Return(nil, errors.New("no stream"))

pkg/grpc/reflection_relay.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import (
99
"github.com/buildbarn/bb-storage/pkg/util"
1010
"github.com/jhump/protoreflect/v2/grpcreflect"
1111
"github.com/jhump/protoreflect/v2/protoresolve"
12-
v1reflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1"
13-
v1alphareflectiongrpc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
12+
"google.golang.org/grpc/reflection/grpc_reflection_v1"
1413

1514
"google.golang.org/grpc"
1615
"google.golang.org/grpc/reflection"
@@ -66,7 +65,6 @@ func registerReflection(backendCtx context.Context, s *grpc.Server, serverRelayC
6665
DescriptorResolver: combinedRemoteResolver,
6766
ExtensionResolver: protoresolve.TypesFromDescriptorPool(combinedRemoteResolver),
6867
}
69-
v1reflectiongrpc.RegisterServerReflectionServer(s, reflection.NewServerV1(serverOptions))
70-
v1alphareflectiongrpc.RegisterServerReflectionServer(s, reflection.NewServer(serverOptions))
68+
grpc_reflection_v1.RegisterServerReflectionServer(s, reflection.NewServerV1(serverOptions))
7169
return nil
7270
}

pkg/grpc/routing_stream_handler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ import (
1616
// com.google.devtools.build.v1.PublishBuildEvent
1717
func NewRoutingStreamHandler(routeTable map[string]grpc.StreamHandler) grpc.StreamHandler {
1818
return func(srv any, stream grpc.ServerStream) error {
19-
serviceMethod := MustStreamMethodFromContext(stream.Context())
19+
// All gRPC invocations has a grpc.ServerTransportStream context.
20+
serviceMethod := grpc.ServerTransportStreamFromContext(stream.Context()).Method()
2021
// Service and method name parsing based on grpc.Server.handleStream().
2122
startIdx := 0
2223
if serviceMethod != "" && serviceMethod[0] == '/' {

pkg/grpc/server_transport_stream_context.go

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)