Skip to content

Commit 67591eb

Browse files
author
Michal Witkowski
authored
Break StreamDirector interface, fix metadata propagation for gRPC-Go>1.5. (#20)
1 parent 97396d9 commit 67591eb

File tree

4 files changed

+29
-16
lines changed

4 files changed

+29
-16
lines changed

proxy/director.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ import (
1313
// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers).
1414
// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned.
1515
//
16+
// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
17+
// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
18+
// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
19+
//
1620
// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors
1721
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
1822
//
1923
// See the rather rich example.
20-
type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error)
24+
type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error)

proxy/examples_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,26 @@ func ExampleTransparentHandler() {
3535
// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
3636
// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
3737
func ExampleStreamDirector() {
38-
director = func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) {
38+
director = func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
3939
// Make sure we never forward internal services.
4040
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
41-
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
41+
return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
4242
}
43-
md, ok := metadata.FromContext(ctx)
43+
md, ok := metadata.FromIncomingContext(ctx)
44+
// Copy the inbound metadata explicitly.
45+
outCtx, _ := context.WithCancel(ctx)
46+
outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
4447
if ok {
4548
// Decide on which backend to dial
4649
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
4750
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
48-
return grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
51+
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
52+
return outCtx, conn, err
4953
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
50-
return grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
54+
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
55+
return outCtx, conn, err
5156
}
5257
}
53-
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
58+
return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
5459
}
5560
}

proxy/handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
6565
return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
6666
}
6767
fullMethodName := lowLevelServerStream.Method()
68-
clientCtx, clientCancel := context.WithCancel(serverStream.Context())
69-
backendConn, err := s.director(serverStream.Context(), fullMethodName)
68+
// We require that the director's returned context inherits from the serverStream.Context().
69+
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName)
70+
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
7071
if err != nil {
7172
return err
7273
}

proxy/handler_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type assertingService struct {
4646

4747
func (s *assertingService) PingEmpty(ctx context.Context, _ *pb.Empty) (*pb.PingResponse, error) {
4848
// Check that this call has client's metadata.
49-
md, ok := metadata.FromContext(ctx)
49+
md, ok := metadata.FromIncomingContext(ctx)
5050
assert.True(s.t, ok, "PingEmpty call must have metadata in context")
5151
_, ok = md[clientMdKey]
5252
assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata")
@@ -116,7 +116,7 @@ func (s *ProxyHappySuite) ctx() context.Context {
116116
}
117117

118118
func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() {
119-
ctx := metadata.NewContext(s.ctx(), metadata.Pairs(clientMdKey, "true"))
119+
ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(clientMdKey, "true"))
120120
out, err := s.testClient.PingEmpty(ctx, &pb.Empty{})
121121
require.NoError(s.T(), err, "PingEmpty should succeed without errors")
122122
require.Equal(s.T(), &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, out)
@@ -148,7 +148,7 @@ func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() {
148148

149149
func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() {
150150
// See SetupSuite where the StreamDirector has a special case.
151-
ctx := metadata.NewContext(s.ctx(), metadata.Pairs(rejectingMdKey, "true"))
151+
ctx := metadata.NewOutgoingContext(s.ctx(), metadata.Pairs(rejectingMdKey, "true"))
152152
_, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"})
153153
require.Error(s.T(), err, "Director should reject this RPC")
154154
assert.Equal(s.T(), codes.PermissionDenied, grpc.Code(err))
@@ -204,14 +204,17 @@ func (s *ProxyHappySuite) SetupSuite() {
204204
// Setup of the proxy's Director.
205205
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
206206
require.NoError(s.T(), err, "must not error on deferred client Dial")
207-
director := func(ctx context.Context, fullName string) (*grpc.ClientConn, error) {
208-
md, ok := metadata.FromContext(ctx)
207+
director := func(ctx context.Context, fullName string) (context.Context, *grpc.ClientConn, error) {
208+
md, ok := metadata.FromIncomingContext(ctx)
209209
if ok {
210210
if _, exists := md[rejectingMdKey]; exists {
211-
return nil, grpc.Errorf(codes.PermissionDenied, "testing rejection")
211+
return ctx, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection")
212212
}
213213
}
214-
return s.serverClientConn, nil
214+
// Explicitly copy the metadata, otherwise the tests will fail.
215+
outCtx, _ := context.WithCancel(ctx)
216+
outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
217+
return outCtx, s.serverClientConn, nil
215218
}
216219
s.proxy = grpc.NewServer(
217220
grpc.CustomCodec(proxy.Codec()),

0 commit comments

Comments
 (0)