Skip to content

Commit 7ac157f

Browse files
committed
server: register Admin service with DRPC server
Enable these service on the DRPC server in addition to gRPC. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None
1 parent 446058d commit 7ac157f

File tree

5 files changed

+100
-4
lines changed

5 files changed

+100
-4
lines changed

pkg/kv/kvserver/loqrecovery/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func NewServer(
119119
func (s Server) ServeLocalReplicas(
120120
ctx context.Context,
121121
_ *serverpb.RecoveryCollectLocalReplicaInfoRequest,
122-
stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
122+
stream serverpb.RPCAdmin_RecoveryCollectLocalReplicaInfoStream,
123123
) error {
124124
v := s.settings.Version.ActiveVersion(ctx)
125125
var stores []*kvserver.Store
@@ -149,7 +149,7 @@ func (s Server) ServeLocalReplicas(
149149
func (s Server) ServeClusterReplicas(
150150
ctx context.Context,
151151
req *serverpb.RecoveryCollectReplicaInfoRequest,
152-
outStream serverpb.Admin_RecoveryCollectReplicaInfoServer,
152+
outStream serverpb.RPCAdmin_RecoveryCollectReplicaInfoStream,
153153
kvDB *kv.DB,
154154
) (err error) {
155155
var descriptors, nodes, replicas atomic.Int64

pkg/server/admin.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ import (
8282
"google.golang.org/grpc/codes"
8383
"google.golang.org/grpc/metadata"
8484
grpcstatus "google.golang.org/grpc/status"
85+
"storj.io/drpc"
86+
"storj.io/drpc/drpcerr"
8587
)
8688

8789
// Number of empty ranges for table descriptors that aren't actually tables. These
@@ -253,6 +255,26 @@ func (s *adminServer) RegisterService(g *grpc.Server) {
253255
serverpb.RegisterAdminServer(g, s)
254256
}
255257

258+
type drpcSystemAdminServer struct {
259+
*systemAdminServer
260+
}
261+
262+
// RegisterDRPCService registers the Admin service with the DRPC server running
263+
// in the system tenant.
264+
func (s *systemAdminServer) RegisterDRPCService(d drpc.Mux) error {
265+
return serverpb.DRPCRegisterAdmin(d, &drpcSystemAdminServer{systemAdminServer: s})
266+
}
267+
268+
type drpcAdminServer struct {
269+
*adminServer
270+
}
271+
272+
// RegisterDRPCService registers the Admin service with the DRPC server running
273+
// in a secondary tenant.
274+
func (s *adminServer) RegisterDRPCService(d drpc.Mux) error {
275+
return serverpb.DRPCRegisterAdmin(d, &drpcAdminServer{adminServer: s})
276+
}
277+
256278
// RegisterGateway starts the gateway (i.e. reverse proxy) that proxies HTTP requests
257279
// to the appropriate gRPC endpoints.
258280
func (s *adminServer) RegisterGateway(
@@ -3312,9 +3334,23 @@ func (s *systemAdminServer) SendKVBatch(
33123334
return br, nil
33133335
}
33143336

3337+
func (s *drpcSystemAdminServer) RecoveryCollectReplicaInfo(
3338+
request *serverpb.RecoveryCollectReplicaInfoRequest,
3339+
stream serverpb.DRPCAdmin_RecoveryCollectReplicaInfoStream,
3340+
) error {
3341+
return s.recoveryCollectReplicaInfo(request, stream)
3342+
}
3343+
33153344
func (s *systemAdminServer) RecoveryCollectReplicaInfo(
33163345
request *serverpb.RecoveryCollectReplicaInfoRequest,
33173346
stream serverpb.Admin_RecoveryCollectReplicaInfoServer,
3347+
) error {
3348+
return s.recoveryCollectReplicaInfo(request, stream)
3349+
}
3350+
3351+
func (s *systemAdminServer) recoveryCollectReplicaInfo(
3352+
request *serverpb.RecoveryCollectReplicaInfoRequest,
3353+
stream serverpb.RPCAdmin_RecoveryCollectReplicaInfoStream,
33183354
) error {
33193355
ctx := stream.Context()
33203356
ctx = s.server.AnnotateCtx(ctx)
@@ -3327,9 +3363,23 @@ func (s *systemAdminServer) RecoveryCollectReplicaInfo(
33273363
return s.server.recoveryServer.ServeClusterReplicas(ctx, request, stream, s.server.db)
33283364
}
33293365

3366+
func (s *drpcSystemAdminServer) RecoveryCollectLocalReplicaInfo(
3367+
request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
3368+
stream serverpb.DRPCAdmin_RecoveryCollectLocalReplicaInfoStream,
3369+
) error {
3370+
return s.recoveryCollectLocalReplicaInfo(request, stream)
3371+
}
3372+
33303373
func (s *systemAdminServer) RecoveryCollectLocalReplicaInfo(
33313374
request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
33323375
stream serverpb.Admin_RecoveryCollectLocalReplicaInfoServer,
3376+
) error {
3377+
return s.recoveryCollectLocalReplicaInfo(request, stream)
3378+
}
3379+
3380+
func (s *systemAdminServer) recoveryCollectLocalReplicaInfo(
3381+
request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
3382+
stream serverpb.RPCAdmin_RecoveryCollectLocalReplicaInfoStream,
33333383
) error {
33343384
ctx := stream.Context()
33353385
ctx = s.server.AnnotateCtx(ctx)
@@ -4043,3 +4093,21 @@ func (s *systemAdminServer) ReadFromTenantInfo(
40434093

40444094
return &serverpb.ReadFromTenantInfoResponse{ReadFrom: dstID, ReadAt: progress.GetStreamIngest().ReplicatedTime}, nil
40454095
}
4096+
4097+
// RecoveryCollectReplicaInfo is unimplemented here because adminServer also
4098+
// have it delegated from embedded serverpb.UnimplementedAdminServer.
4099+
func (s *drpcAdminServer) RecoveryCollectReplicaInfo(
4100+
request *serverpb.RecoveryCollectReplicaInfoRequest,
4101+
stream serverpb.DRPCAdmin_RecoveryCollectReplicaInfoStream,
4102+
) error {
4103+
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
4104+
}
4105+
4106+
// RecoveryCollectLocalReplicaInfo is unimplemented here because adminServer
4107+
// also have it delegated from embedded serverpb.UnimplementedAdminServer.
4108+
func (s *drpcAdminServer) RecoveryCollectLocalReplicaInfo(
4109+
request *serverpb.RecoveryCollectLocalReplicaInfoRequest,
4110+
stream serverpb.DRPCAdmin_RecoveryCollectLocalReplicaInfoStream,
4111+
) error {
4112+
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
4113+
}

pkg/server/drain.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ var (
8888
// instructs the process to terminate.
8989
// This method is part of the serverpb.AdminClient interface.
9090
func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_DrainServer) error {
91+
return s.drain(req, stream)
92+
}
93+
94+
func (s *adminServer) drain(
95+
req *serverpb.DrainRequest, stream serverpb.RPCAdmin_DrainStream,
96+
) error {
9197
ctx := stream.Context()
9298
ctx = s.AnnotateCtx(ctx)
9399

@@ -114,6 +120,22 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr
114120
return s.drainServer.handleDrain(ctx, req, stream)
115121
}
116122

123+
// Drain puts the node into the specified drain mode(s) and optionally
124+
// instructs the process to terminate.
125+
func (s *drpcAdminServer) Drain(
126+
req *serverpb.DrainRequest, stream serverpb.DRPCAdmin_DrainStream,
127+
) error {
128+
return s.adminServer.drain(req, stream)
129+
}
130+
131+
// Drain puts the node into the specified drain mode(s) and optionally
132+
// instructs the process to terminate.
133+
func (s *drpcSystemAdminServer) Drain(
134+
req *serverpb.DrainRequest, stream serverpb.DRPCAdmin_DrainStream,
135+
) error {
136+
return s.adminServer.drain(req, stream)
137+
}
138+
117139
type drainServer struct {
118140
stopper *stop.Stopper
119141
// stopTrigger is used to request that the server is shut down.
@@ -162,7 +184,7 @@ func (s *drainServer) setNode(node *Node, nodeLiveness *liveness.NodeLiveness) {
162184
}
163185

164186
func (s *drainServer) handleDrain(
165-
ctx context.Context, req *serverpb.DrainRequest, stream serverpb.Admin_DrainServer,
187+
ctx context.Context, req *serverpb.DrainRequest, stream serverpb.RPCAdmin_DrainStream,
166188
) error {
167189
log.Ops.Infof(ctx, "drain request received with doDrain = %v, shutdown = %v", req.DoDrain, req.Shutdown)
168190

@@ -240,7 +262,7 @@ func delegateDrain(
240262
ctx context.Context,
241263
req *serverpb.DrainRequest,
242264
client serverpb.RPCAdminClient,
243-
stream serverpb.Admin_DrainServer,
265+
stream serverpb.RPCAdmin_DrainStream,
244266
) error {
245267
// Retrieve the stream interface to the target node.
246268
drainClient, err := client.Drain(ctx, req)

pkg/server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
12591259
if err := sTS.RegisterDRPCService(drpcServer); err != nil {
12601260
return nil, err
12611261
}
1262+
if err := sAdmin.RegisterDRPCService(drpcServer); err != nil {
1263+
return nil, err
1264+
}
12621265

12631266
// Tell the node event logger (join, restart) how to populate SQL entries
12641267
// into system.eventlog.

pkg/server/tenant.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,9 @@ func newTenantServer(
486486
if err := args.tenantTimeSeriesServer.RegisterDRPCService(args.drpc); err != nil {
487487
return nil, err
488488
}
489+
if err := sAdmin.RegisterDRPCService(args.drpc); err != nil {
490+
return nil, err
491+
}
489492

490493
// Tell the status/admin servers how to access SQL structures.
491494
sStatus.setStmtDiagnosticsRequester(sqlServer.execCfg.StmtDiagnosticsRecorder)

0 commit comments

Comments
 (0)