@@ -23,6 +23,10 @@ import (
23
23
"google.golang.org/grpc/credentials"
24
24
grpcpeer "google.golang.org/grpc/peer"
25
25
"google.golang.org/grpc/status"
26
+ "storj.io/drpc"
27
+ "storj.io/drpc/drpcctx"
28
+ "storj.io/drpc/drpcmetadata"
29
+ "storj.io/drpc/drpcmux"
26
30
)
27
31
28
32
var errTLSInfoMissing = authError ("TLSInfo is not available in request context" )
@@ -41,11 +45,14 @@ func authErrorf(format string, a ...interface{}) error {
41
45
type kvAuth struct {
42
46
sv * settings.Values
43
47
tenant tenantAuthorizer
48
+ isDRPC bool
44
49
}
45
50
46
51
// kvAuth implements the auth interface.
47
- func (a kvAuth ) AuthUnary () grpc.UnaryServerInterceptor { return a .unaryInterceptor }
48
- func (a kvAuth ) AuthStream () grpc.StreamServerInterceptor { return a .streamInterceptor }
52
+ func (a kvAuth ) AuthUnary () grpc.UnaryServerInterceptor { return a .unaryInterceptor }
53
+ func (a kvAuth ) AuthDRPCUnary () drpcmux.UnaryServerInterceptor { return a .unaryDRPCInterceptor }
54
+ func (a kvAuth ) AuthDRPCStream () drpcmux.StreamServerInterceptor { return a .streamDRPCInterceptor }
55
+ func (a kvAuth ) AuthStream () grpc.StreamServerInterceptor { return a .streamInterceptor }
49
56
50
57
func (a kvAuth ) unaryInterceptor (
51
58
ctx context.Context , req interface {}, info * grpc.UnaryServerInfo , handler grpc.UnaryHandler ,
@@ -81,7 +88,41 @@ func (a kvAuth) unaryInterceptor(
81
88
return nil , err
82
89
}
83
90
case authzTenantServerToTenantServer :
84
- // Tenant servers can see all of each other's RPCs.
91
+ // Tenant servers can see all of each other's RPCs.
92
+ case authzPrivilegedPeerToServer :
93
+ // Privileged clients (root/node) can see all RPCs.
94
+ default :
95
+ return nil , errors .AssertionFailedf ("unhandled case: %T" , err )
96
+ }
97
+ return handler (ctx , req )
98
+ }
99
+
100
+ func (a kvAuth ) unaryDRPCInterceptor (
101
+ ctx context.Context , req interface {}, rpc string , handler drpcmux.UnaryHandler ,
102
+ ) (out interface {}, err error ) {
103
+ // Perform authentication and authz selection.
104
+ authnRes , authz , err := a .authenticateAndSelectAuthzRule (ctx )
105
+ if err != nil {
106
+ return nil , err
107
+ }
108
+
109
+ // Enhance the context to ensure the API handler only sees a client tenant ID
110
+ // via roachpb.ClientTenantFromContext when relevant.
111
+ ctx = contextForRequest (ctx , authnRes )
112
+
113
+ // Handle authorization according to the selected authz method.
114
+ switch ar := authz .(type ) {
115
+ case authzTenantServerToKVServer :
116
+ // Clear any leftover incoming DRPC metadata, if this call is
117
+ // originating from a RPC handler function called as a result of a
118
+ // tenant call. See unaryInterceptor for more details.
119
+ ctx = drpcmetadata .ClearContext (ctx )
120
+
121
+ if err := a .tenant .authorize (ctx , a .sv , roachpb .TenantID (ar ), rpc , req ); err != nil {
122
+ return nil , err
123
+ }
124
+ case authzTenantServerToTenantServer :
125
+ // Tenant servers can see all of each other's RPCs.
85
126
case authzPrivilegedPeerToServer :
86
127
// Privileged clients (root/node) can see all RPCs.
87
128
default :
@@ -143,7 +184,7 @@ func (a kvAuth) streamInterceptor(
143
184
},
144
185
}
145
186
case authzTenantServerToTenantServer :
146
- // Tenant servers can see all of each other's RPCs.
187
+ // Tenant servers can see all of each other's RPCs.
147
188
case authzPrivilegedPeerToServer :
148
189
// Privileged clients (root/node) can see all RPCs.
149
190
default :
@@ -152,6 +193,55 @@ func (a kvAuth) streamInterceptor(
152
193
return handler (srv , ss )
153
194
}
154
195
196
+ func (a kvAuth ) streamDRPCInterceptor (
197
+ stream drpc.Stream , rpc string , handler drpcmux.StreamHandler ,
198
+ ) (out interface {}, err error ) {
199
+ ctx := stream .Context ()
200
+ // Perform authentication and authz selection.
201
+ authnRes , authz , err := a .authenticateAndSelectAuthzRule (ctx )
202
+ if err != nil {
203
+ return nil , err
204
+ }
205
+
206
+ // Enhance the context to ensure the API handler only sees a client tenant ID
207
+ // via roachpb.ClientTenantFromContext when relevant.
208
+ ctx = contextForRequest (ctx , authnRes )
209
+
210
+ // Handle authorization according to the selected authz method.
211
+ switch ar := authz .(type ) {
212
+ case authzTenantServerToKVServer :
213
+ // Clear any leftover incoming DRPC metadata, if this call is
214
+ // originating from a RPC handler function called as a result of a
215
+ // tenant call. See streamInterceptor for more details.
216
+
217
+ if rpc == "/cockroach.blobs.Blob/PutStream" {
218
+ ctx = drpcmetadata .ClearContextExcept (ctx , "filename" )
219
+ } else {
220
+ ctx = drpcmetadata .ClearContext (ctx )
221
+ }
222
+
223
+ originalStream := stream
224
+ stream = & wrappedDRPCServerStream {
225
+ Stream : originalStream ,
226
+ ctx : ctx ,
227
+ recv : func (m drpc.Message , enc drpc.Encoding ) error {
228
+ if err := originalStream .MsgRecv (m , enc ); err != nil {
229
+ return err
230
+ }
231
+ // 'm' is now populated and contains the request from the client.
232
+ return a .tenant .authorize (ctx , a .sv , roachpb .TenantID (ar ), rpc , m )
233
+ },
234
+ }
235
+ case authzTenantServerToTenantServer :
236
+ // Tenant servers can see all of each other's RPCs.
237
+ case authzPrivilegedPeerToServer :
238
+ // Privileged clients (root/node) can see all RPCs.
239
+ default :
240
+ return nil , errors .AssertionFailedf ("unhandled case: %T" , err )
241
+ }
242
+ return handler (stream )
243
+ }
244
+
155
245
func (a kvAuth ) authenticateAndSelectAuthzRule (
156
246
ctx context.Context ,
157
247
) (authnResult , requiredAuthzMethod , error ) {
@@ -162,7 +252,7 @@ func (a kvAuth) authenticateAndSelectAuthzRule(
162
252
}
163
253
164
254
// Select authorization rules suitable for the peer.
165
- authz , err := a .selectAuthzMethod (ctx , authnRes )
255
+ authz , err := a .selectAuthzMethod (authnRes )
166
256
if err != nil {
167
257
return nil , nil , err
168
258
}
@@ -176,19 +266,30 @@ type authnResult interface {
176
266
authnResult ()
177
267
}
178
268
179
- func getClientCert (ctx context.Context ) (* x509.Certificate , error ) {
180
- p , ok := grpcpeer .FromContext (ctx )
181
- if ! ok {
182
- return nil , errTLSInfoMissing
269
+ func (a kvAuth ) getClientCert (ctx context.Context ) (* x509.Certificate , error ) {
270
+ var certs []* x509.Certificate
271
+ if a .isDRPC {
272
+ info , ok := drpcctx .GetPeerConnectionInfo (ctx )
273
+ if ! ok {
274
+ return nil , errTLSInfoMissing
275
+ }
276
+ certs = info .Certificates
277
+ } else {
278
+ p , ok := grpcpeer .FromContext (ctx )
279
+ if ! ok {
280
+ return nil , errTLSInfoMissing
281
+ }
282
+ tlsInfo , ok := p .AuthInfo .(credentials.TLSInfo )
283
+ if ! ok {
284
+ return nil , errTLSInfoMissing
285
+ }
286
+ certs = tlsInfo .State .PeerCertificates
183
287
}
184
288
185
- tlsInfo , ok := p .AuthInfo .(credentials.TLSInfo )
186
- if ! ok || len (tlsInfo .State .PeerCertificates ) == 0 {
289
+ if len (certs ) == 0 {
187
290
return nil , errTLSInfoMissing
188
291
}
189
-
190
- clientCert := tlsInfo .State .PeerCertificates [0 ]
191
- return clientCert , nil
292
+ return certs [0 ], nil
192
293
}
193
294
194
295
// authnSuccessPeerIsTenantServer indicates authentication has
@@ -247,7 +348,7 @@ func (a kvAuth) authenticateLocalRequest(
247
348
) (authnResult , error ) {
248
349
// Sanity check: verify that we do not also have gRPC network credentials
249
350
// in the context. This would indicate that metadata was improperly propagated.
250
- maybeTid , err := tenantIDFromRPCMetadata (ctx )
351
+ maybeTid , err := a . tenantIDFromRPCMetadata (ctx )
251
352
if err != nil || maybeTid .IsSet () {
252
353
logcrash .ReportOrPanic (ctx , a .sv , "programming error: network credentials in internal adapter request (%v, %v)" , maybeTid , err )
253
354
return nil , authErrorf ("programming error" )
@@ -268,12 +369,12 @@ func (a kvAuth) authenticateLocalRequest(
268
369
func (a kvAuth ) authenticateNetworkRequest (ctx context.Context ) (authnResult , error ) {
269
370
// We will need to look at the TLS cert in any case, so extract it
270
371
// first.
271
- clientCert , err := getClientCert (ctx )
372
+ clientCert , err := a . getClientCert (ctx )
272
373
if err != nil {
273
374
return nil , err
274
375
}
275
376
276
- tenantIDFromMetadata , err := tenantIDFromRPCMetadata (ctx )
377
+ tenantIDFromMetadata , err := a . tenantIDFromRPCMetadata (ctx )
277
378
if err != nil {
278
379
return nil , authErrorf ("client provided invalid tenant ID: %v" , err )
279
380
}
@@ -357,9 +458,7 @@ func (authzPrivilegedPeerToServer) rpcAuthzMethod() {}
357
458
358
459
// selectAuthzMethod selects the authorization rule to use for the
359
460
// given authentication event.
360
- func (a kvAuth ) selectAuthzMethod (
361
- ctx context.Context , ar authnResult ,
362
- ) (requiredAuthzMethod , error ) {
461
+ func (a kvAuth ) selectAuthzMethod (ar authnResult ) (requiredAuthzMethod , error ) {
363
462
switch res := ar .(type ) {
364
463
case authnSuccessPeerIsTenantServer :
365
464
// The client is a tenant server. We have two possible cases:
@@ -488,9 +587,20 @@ func newTenantClientCreds(tid roachpb.TenantID) credentials.PerRPCCredentials {
488
587
}
489
588
}
490
589
590
+ func newPerRPCTIDMetdata (tid roachpb.TenantID ) (string , string ) {
591
+ return clientTIDMetadataHeaderKey , fmt .Sprint (tid )
592
+ }
593
+
491
594
// tenantIDFromRPCMetadata checks if there is a tenant ID in
492
595
// the incoming gRPC metadata.
493
- func tenantIDFromRPCMetadata (ctx context.Context ) (roachpb.TenantID , error ) {
596
+ func (a kvAuth ) tenantIDFromRPCMetadata (ctx context.Context ) (roachpb.TenantID , error ) {
597
+ if a .isDRPC {
598
+ val , ok := drpcmetadata .GetValue (ctx , clientTIDMetadataHeaderKey )
599
+ if ! ok {
600
+ return roachpb.TenantID {}, nil
601
+ }
602
+ return tenantIDFromString (val , "drpc metadata" )
603
+ }
494
604
val , ok := grpcutil .FastFirstValueFromIncomingContext (ctx , clientTIDMetadataHeaderKey )
495
605
if ! ok {
496
606
return roachpb.TenantID {}, nil
0 commit comments