Skip to content

Commit add581c

Browse files
craig[bot]shubhamdhama
andcommitted
Merge #148584
148584: server: register status, TS, Login/Logout, Admin services with DRPC server r=cthumuluru-crdb,aa-joshi a=shubhamdhama Enable the `Status`, `Login`, `Logout`, `Timeseries`, `Admin` services on the DRPC server in addition to gRPC. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Fixes: #148721 Fixes: #148722 Fixes: #148724 Fixes: #148723 Epic: CRDB-48925 Release note: None Co-authored-by: Shubham Dhama <[email protected]>
2 parents 4344c0a + 2ecd50c commit add581c

File tree

12 files changed

+199
-7
lines changed

12 files changed

+199
-7
lines changed

pkg/kv/kvserver/loqrecovery/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func NewServer(
119119
func (s Server) ServeLocalReplicas(
120120
ctx context.Context,
121121
_ *serverpb.RecoveryCollectLocalReplicaInfoRequest,
122-
stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
122+
stream serverpb.RPCAdmin_RecoveryCollectLocalReplicaInfoStream,
123123
) error {
124124
v := s.settings.Version.ActiveVersion(ctx)
125125
var stores []*kvserver.Store
@@ -149,7 +149,7 @@ func (s Server) ServeLocalReplicas(
149149
func (s Server) ServeClusterReplicas(
150150
ctx context.Context,
151151
req *serverpb.RecoveryCollectReplicaInfoRequest,
152-
outStream serverpb.Admin_RecoveryCollectReplicaInfoServer,
152+
outStream serverpb.RPCAdmin_RecoveryCollectReplicaInfoStream,
153153
kvDB *kv.DB,
154154
) (err error) {
155155
var descriptors, nodes, replicas atomic.Int64

pkg/server/admin.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ import (
8282
"google.golang.org/grpc/codes"
8383
"google.golang.org/grpc/metadata"
8484
grpcstatus "google.golang.org/grpc/status"
85+
"storj.io/drpc"
86+
"storj.io/drpc/drpcerr"
8587
)
8688

8789
// Number of empty ranges for table descriptors that aren't actually tables. These
@@ -253,6 +255,26 @@ func (s *adminServer) RegisterService(g *grpc.Server) {
253255
serverpb.RegisterAdminServer(g, s)
254256
}
255257

258+
type drpcSystemAdminServer struct {
259+
*systemAdminServer
260+
}
261+
262+
// RegisterDRPCService registers the Admin service with the DRPC server running
263+
// in the system tenant.
264+
func (s *systemAdminServer) RegisterDRPCService(d drpc.Mux) error {
265+
return serverpb.DRPCRegisterAdmin(d, &drpcSystemAdminServer{systemAdminServer: s})
266+
}
267+
268+
type drpcAdminServer struct {
269+
*adminServer
270+
}
271+
272+
// RegisterDRPCService registers the Admin service with the DRPC server running
273+
// in a secondary tenant.
274+
func (s *adminServer) RegisterDRPCService(d drpc.Mux) error {
275+
return serverpb.DRPCRegisterAdmin(d, &drpcAdminServer{adminServer: s})
276+
}
277+
256278
// RegisterGateway starts the gateway (i.e. reverse proxy) that proxies HTTP requests
257279
// to the appropriate gRPC endpoints.
258280
func (s *adminServer) RegisterGateway(
@@ -3312,9 +3334,23 @@ func (s *systemAdminServer) SendKVBatch(
33123334
return br, nil
33133335
}
33143336

3337+
func (s *drpcSystemAdminServer) RecoveryCollectReplicaInfo(
3338+
request *serverpb.RecoveryCollectReplicaInfoRequest,
3339+
stream serverpb.DRPCAdmin_RecoveryCollectReplicaInfoStream,
3340+
) error {
3341+
return s.recoveryCollectReplicaInfo(request, stream)
3342+
}
3343+
33153344
func (s *systemAdminServer) RecoveryCollectReplicaInfo(
33163345
request *serverpb.RecoveryCollectReplicaInfoRequest,
33173346
stream serverpb.Admin_RecoveryCollectReplicaInfoServer,
3347+
) error {
3348+
return s.recoveryCollectReplicaInfo(request, stream)
3349+
}
3350+
3351+
func (s *systemAdminServer) recoveryCollectReplicaInfo(
3352+
request *serverpb.RecoveryCollectReplicaInfoRequest,
3353+
stream serverpb.RPCAdmin_RecoveryCollectReplicaInfoStream,
33183354
) error {
33193355
ctx := stream.Context()
33203356
ctx = s.server.AnnotateCtx(ctx)
@@ -3327,9 +3363,23 @@ func (s *systemAdminServer) RecoveryCollectReplicaInfo(
33273363
return s.server.recoveryServer.ServeClusterReplicas(ctx, request, stream, s.server.db)
33283364
}
33293365

3366+
func (s *drpcSystemAdminServer) RecoveryCollectLocalReplicaInfo(
3367+
request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
3368+
stream serverpb.DRPCAdmin_RecoveryCollectLocalReplicaInfoStream,
3369+
) error {
3370+
return s.recoveryCollectLocalReplicaInfo(request, stream)
3371+
}
3372+
33303373
func (s *systemAdminServer) RecoveryCollectLocalReplicaInfo(
33313374
request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
33323375
stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
3376+
) error {
3377+
return s.recoveryCollectLocalReplicaInfo(request, stream)
3378+
}
3379+
3380+
func (s *systemAdminServer) recoveryCollectLocalReplicaInfo(
3381+
request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
3382+
stream serverpb.RPCAdmin_RecoveryCollectLocalReplicaInfoStream,
33333383
) error {
33343384
ctx := stream.Context()
33353385
ctx = s.server.AnnotateCtx(ctx)
@@ -4043,3 +4093,21 @@ func (s *systemAdminServer) ReadFromTenantInfo(
40434093

40444094
return &serverpb.ReadFromTenantInfoResponse{ReadFrom: dstID, ReadAt: progress.GetStreamIngest().ReplicatedTime}, nil
40454095
}
4096+
4097+
// RecoveryCollectReplicaInfo is unimplemented here because adminServer also
4098+
// have it delegated from embedded serverpb.UnimplementedAdminServer.
4099+
func (s *drpcAdminServer) RecoveryCollectReplicaInfo(
4100+
request *serverpb.RecoveryCollectReplicaInfoRequest,
4101+
stream serverpb.DRPCAdmin_RecoveryCollectReplicaInfoStream,
4102+
) error {
4103+
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
4104+
}
4105+
4106+
// RecoveryCollectLocalReplicaInfo is unimplemented here because adminServer
4107+
// also have it delegated from embedded serverpb.UnimplementedAdminServer.
4108+
func (s *drpcAdminServer) RecoveryCollectLocalReplicaInfo(
4109+
request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
4110+
stream serverpb.DRPCAdmin_RecoveryCollectLocalReplicaInfoStream,
4111+
) error {
4112+
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
4113+
}

pkg/server/authserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ go_library(
4444
"@com_github_cockroachdb_logtags//:logtags",
4545
"@com_github_cockroachdb_redact//:redact",
4646
"@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library",
47+
"@io_storj_drpc//:drpc",
4748
"@org_golang_google_grpc//:grpc",
4849
"@org_golang_google_grpc//codes",
4950
"@org_golang_google_grpc//metadata",

pkg/server/authserver/api.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
1919
"github.com/grpc-ecosystem/grpc-gateway/runtime"
2020
"google.golang.org/grpc"
21+
"storj.io/drpc"
2122
)
2223

2324
type Server interface {
2425
RegisterService(*grpc.Server)
2526
RegisterGateway(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error
2627

28+
RegisterDRPCService(drpc.Mux) error
29+
2730
// UserLogin verifies an incoming request by a user to create an web
2831
// authentication session. It checks the provided credentials against
2932
// the system.users table, and if successful creates a new

pkg/server/authserver/authentication.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"google.golang.org/grpc/codes"
4848
"google.golang.org/grpc/metadata"
4949
"google.golang.org/grpc/status"
50+
"storj.io/drpc"
5051
)
5152

5253
const (
@@ -132,6 +133,15 @@ type authenticationServer struct {
132133
func (s *authenticationServer) RegisterService(g *grpc.Server) {
133134
serverpb.RegisterLogInServer(g, s)
134135
serverpb.RegisterLogOutServer(g, s)
136+
137+
}
138+
139+
// RegisterService registers the LogIn and LogOut services with DRPC.
140+
func (s *authenticationServer) RegisterDRPCService(d drpc.Mux) error {
141+
if err := serverpb.DRPCRegisterLogIn(d, s); err != nil {
142+
return err
143+
}
144+
return serverpb.DRPCRegisterLogOut(d, s)
135145
}
136146

137147
// RegisterGateway starts the gateway (i.e. reverse proxy) that proxies HTTP requests

pkg/server/drain.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ var (
8888
// instructs the process to terminate.
8989
// This method is part of the serverpb.AdminClient interface.
9090
func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_DrainServer) error {
91+
return s.drain(req, stream)
92+
}
93+
94+
func (s *adminServer) drain(
95+
req *serverpb.DrainRequest, stream serverpb.RPCAdmin_DrainStream,
96+
) error {
9197
ctx := stream.Context()
9298
ctx = s.AnnotateCtx(ctx)
9399

@@ -114,6 +120,22 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr
114120
return s.drainServer.handleDrain(ctx, req, stream)
115121
}
116122

123+
// Drain puts the node into the specified drain mode(s) and optionally
124+
// instructs the process to terminate.
125+
func (s *drpcAdminServer) Drain(
126+
req *serverpb.DrainRequest, stream serverpb.DRPCAdmin_DrainStream,
127+
) error {
128+
return s.adminServer.drain(req, stream)
129+
}
130+
131+
// Drain puts the node into the specified drain mode(s) and optionally
132+
// instructs the process to terminate.
133+
func (s *drpcSystemAdminServer) Drain(
134+
req *serverpb.DrainRequest, stream serverpb.DRPCAdmin_DrainStream,
135+
) error {
136+
return s.adminServer.drain(req, stream)
137+
}
138+
117139
type drainServer struct {
118140
stopper *stop.Stopper
119141
// stopTrigger is used to request that the server is shut down.
@@ -162,7 +184,7 @@ func (s *drainServer) setNode(node *Node, nodeLiveness *liveness.NodeLiveness) {
162184
}
163185

164186
func (s *drainServer) handleDrain(
165-
ctx context.Context, req *serverpb.DrainRequest, stream serverpb.Admin_DrainServer,
187+
ctx context.Context, req *serverpb.DrainRequest, stream serverpb.RPCAdmin_DrainStream,
166188
) error {
167189
log.Ops.Infof(ctx, "drain request received with doDrain = %v, shutdown = %v", req.DoDrain, req.Shutdown)
168190

@@ -240,7 +262,7 @@ func delegateDrain(
240262
ctx context.Context,
241263
req *serverpb.DrainRequest,
242264
client serverpb.RPCAdminClient,
243-
stream serverpb.Admin_DrainServer,
265+
stream serverpb.RPCAdmin_DrainStream,
244266
) error {
245267
// Retrieve the stream interface to the target node.
246268
drainClient, err := client.Drain(ctx, req)

pkg/server/drpc_server.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/server/srverrors"
1414
"github.com/cockroachdb/errors"
1515
"google.golang.org/grpc/codes"
16+
"storj.io/drpc"
1617
"storj.io/drpc/drpcerr"
1718
)
1819

@@ -75,3 +76,9 @@ func (s *drpcServer) health(ctx context.Context) error {
7576
return srverrors.ServerError(ctx, errors.Newf("unknown mode: %v", sm))
7677
}
7778
}
79+
80+
// drpcServiceRegistrar is implemented by servers that create a DRPC server and
81+
// registers it with drpc.Mux
82+
type drpcServiceRegistrar interface {
83+
RegisterDRPCService(drpc.Mux) error
84+
}

pkg/server/server.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,12 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
12531253
gw.RegisterService(grpcServer.Server)
12541254
}
12551255

1256+
for _, s := range []drpcServiceRegistrar{sAdmin, sStatus, sAuth, &sTS} {
1257+
if err := s.RegisterDRPCService(drpcServer); err != nil {
1258+
return nil, err
1259+
}
1260+
}
1261+
12561262
// Tell the node event logger (join, restart) how to populate SQL entries
12571263
// into system.eventlog.
12581264
node.InitLogger(sqlServer.execCfg)

pkg/server/status.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ import (
9595
"google.golang.org/grpc"
9696
"google.golang.org/grpc/codes"
9797
"google.golang.org/grpc/status"
98+
"storj.io/drpc"
9899
)
99100

100101
const (
@@ -695,6 +696,11 @@ func (s *statusServer) RegisterService(g *grpc.Server) {
695696
serverpb.RegisterStatusServer(g, s)
696697
}
697698

699+
// RegisterService registers the DRPC service.
700+
func (s *statusServer) RegisterDRPCService(d drpc.Mux) error {
701+
return serverpb.DRPCRegisterStatus(d, s)
702+
}
703+
698704
// RegisterGateway starts the gateway (i.e. reverse
699705
// proxy) that proxies HTTP requests to the appropriate gRPC endpoints.
700706
func (s *statusServer) RegisterGateway(
@@ -709,6 +715,11 @@ func (s *systemStatusServer) RegisterService(g *grpc.Server) {
709715
serverpb.RegisterStatusServer(g, s)
710716
}
711717

718+
// RegisterService registers the DRPC service.
719+
func (s *systemStatusServer) RegisterDRPCService(d drpc.Mux) error {
720+
return serverpb.DRPCRegisterStatus(d, s)
721+
}
722+
712723
func (s *statusServer) parseNodeID(nodeIDParam string) (roachpb.NodeID, bool, error) {
713724
id, local, err := s.serverIterator.parseServerID(nodeIDParam)
714725
return roachpb.NodeID(id), local, err

pkg/server/tenant.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,12 @@ func newTenantServer(
480480
gw.RegisterService(args.grpc.Server)
481481
}
482482

483+
for _, s := range []drpcServiceRegistrar{sAdmin, sStatus, sAuth, args.tenantTimeSeriesServer} {
484+
if err := s.RegisterDRPCService(args.drpc); err != nil {
485+
return nil, err
486+
}
487+
}
488+
483489
// Tell the status/admin servers how to access SQL structures.
484490
sStatus.setStmtDiagnosticsRequester(sqlServer.execCfg.StmtDiagnosticsRecorder)
485491
serverIterator.sqlServer = sqlServer

0 commit comments

Comments
 (0)