Skip to content

Commit 8b21eda

Browse files
committed
kv: extract TenantService and register with DRPC server
This is a follow-up to #145195, where we extracted `KVBatch` from the `Internal` service. Here, we do the same with `TenantService`. The TL;DR is that smaller services are easier to maintain and can be more smoothly migrated to DRPC. We also enable this service on the DRPC server. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None
1 parent ca8c174 commit 8b21eda

File tree

3 files changed

+84
-9
lines changed

3 files changed

+84
-9
lines changed

pkg/kv/kvpb/api.proto

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3650,19 +3650,40 @@ message JoinNodeResponse {
36503650
roachpb.Version active_version = 4;
36513651
}
36523652

3653+
// TODO(server): Remove the methods of these serivces from the 'Internal'
3654+
// service once the migration to DRPC complete, making these service the sole
3655+
// definition. Also, update the comments here.
3656+
3657+
// TODO(server,kv): Improve the code comment documentation of these methods.
3658+
// Many of these methods are simply undocumented.
3659+
36533660
// KVBatch defines Batch RPCs of Internal service (see below) for DRPC use only.
36543661
//
36553662
// During the gRPC to DRPC migration:
36563663
// - gRPC clients will continue to use the full 'Internal' service client.
36573664
// - DRPC clients targeting Batch* RPCs will use clients generated from this.
3658-
//
3659-
// TODO(server): Remove these methods from the 'Internal' service once the
3660-
// migration to DRPC complete, making this service the sole definition.
36613665
service KVBatch {
36623666
rpc Batch(BatchRequest) returns (BatchResponse) {}
36633667
rpc BatchStream(stream BatchRequest) returns (stream BatchResponse) {}
36643668
}
36653669

3670+
// TenantService provides RPCs for secondary tenants to access data that is
3671+
// exclusively available in the system tenant. This service is used by the
3672+
// tenant connector.
3673+
service TenantService {
3674+
// TenantSettings is used by tenants to obtain and stay up to date with tenant
3675+
// setting overrides.
3676+
rpc TenantSettings(TenantSettingsRequest) returns (stream TenantSettingsEvent) {}
3677+
3678+
rpc GossipSubscription(GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {}
3679+
3680+
rpc RangeLookup(RangeLookupRequest) returns (RangeLookupResponse) {}
3681+
3682+
// GetRangeDescriptors is used by tenants to get range descriptors for their
3683+
// own ranges.
3684+
rpc GetRangeDescriptors(GetRangeDescriptorsRequest) returns (stream GetRangeDescriptorsResponse) {}
3685+
}
3686+
36663687
// Batch and RangeFeed service implemented by nodes for KV API requests.
36673688
service Internal {
36683689
rpc Batch (BatchRequest) returns (BatchResponse) {}

pkg/server/node.go

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1899,19 +1899,19 @@ func (n *Node) batchStreamImpl(stream kvpb.RPCKVBatch_BatchStreamStream) error {
18991899
}
19001900
}
19011901

1902+
type kvBatchServer Node
1903+
19021904
func (n *Node) AsDRPCKVBatchServer() kvpb.DRPCKVBatchServer {
1903-
return (*drpcNode)(n)
1905+
return (*kvBatchServer)(n)
19041906
}
19051907

1906-
type drpcNode Node
1907-
1908-
func (n *drpcNode) Batch(
1908+
func (n *kvBatchServer) Batch(
19091909
ctx context.Context, request *kvpb.BatchRequest,
19101910
) (*kvpb.BatchResponse, error) {
19111911
return (*Node)(n).Batch(ctx, request)
19121912
}
19131913

1914-
func (n *drpcNode) BatchStream(stream kvpb.DRPCKVBatch_BatchStreamStream) error {
1914+
func (n *kvBatchServer) BatchStream(stream kvpb.DRPCKVBatch_BatchStreamStream) error {
19151915
return (*Node)(n).batchStreamImpl(stream)
19161916
}
19171917

@@ -2055,7 +2055,6 @@ func filterRangeLookupResponseForTenant(
20552055
return truncated
20562056
}
20572057

2058-
// RangeLookup implements the kvpb.InternalServer interface.
20592058
func (n *Node) RangeLookup(
20602059
ctx context.Context, req *kvpb.RangeLookupRequest,
20612060
) (*kvpb.RangeLookupResponse, error) {
@@ -2392,6 +2391,12 @@ func (n *Node) ResetQuorum(
23922391
// GossipSubscription implements the kvpb.InternalServer interface.
23932392
func (n *Node) GossipSubscription(
23942393
args *kvpb.GossipSubscriptionRequest, stream kvpb.Internal_GossipSubscriptionServer,
2394+
) error {
2395+
return n.gossipSubscription(args, stream)
2396+
}
2397+
2398+
func (n *Node) gossipSubscription(
2399+
args *kvpb.GossipSubscriptionRequest, stream kvpb.RPCInternal_GossipSubscriptionStream,
23952400
) error {
23962401
ctx := n.storeCfg.AmbientCtx.AnnotateCtx(stream.Context())
23972402
ctxDone := ctx.Done()
@@ -2481,6 +2486,12 @@ func (n *Node) waitForTenantWatcherReadiness(
24812486
// TenantSettings implements the kvpb.InternalServer interface.
24822487
func (n *Node) TenantSettings(
24832488
args *kvpb.TenantSettingsRequest, stream kvpb.Internal_TenantSettingsServer,
2489+
) error {
2490+
return n.tenantSettings(args, stream)
2491+
}
2492+
2493+
func (n *Node) tenantSettings(
2494+
args *kvpb.TenantSettingsRequest, stream kvpb.RPCTenantService_TenantSettingsStream,
24842495
) error {
24852496
ctx := n.storeCfg.AmbientCtx.AnnotateCtx(stream.Context())
24862497
ctxDone := ctx.Done()
@@ -2888,6 +2899,12 @@ func (n *Node) SpanConfigConformance(
28882899
func (n *Node) GetRangeDescriptors(
28892900
args *kvpb.GetRangeDescriptorsRequest, stream kvpb.Internal_GetRangeDescriptorsServer,
28902901
) error {
2902+
return n.getRangeDescriptors(args, stream)
2903+
}
2904+
2905+
func (n *Node) getRangeDescriptors(
2906+
args *kvpb.GetRangeDescriptorsRequest, stream kvpb.RPCTenantService_GetRangeDescriptorsStream,
2907+
) error {
28912908

28922909
iter, err := n.execCfg.RangeDescIteratorFactory.NewLazyIterator(stream.Context(), args.Span, int(args.BatchSize))
28932910
if err != nil {
@@ -2914,3 +2931,37 @@ func (n *Node) GetRangeDescriptors(
29142931
RangeDescriptors: rangeDescriptors,
29152932
})
29162933
}
2934+
2935+
type tenantServiceServer Node
2936+
2937+
func (n *Node) AsDRPCTenantServiceServer() kvpb.DRPCTenantServiceServer {
2938+
return (*tenantServiceServer)(n)
2939+
}
2940+
2941+
// GossipSubscription implements the kvpb.DRPCTenantServiceServer interface.
2942+
func (n *tenantServiceServer) GossipSubscription(
2943+
args *kvpb.GossipSubscriptionRequest, stream kvpb.DRPCTenantService_GossipSubscriptionStream,
2944+
) error {
2945+
return (*Node)(n).gossipSubscription(args, stream)
2946+
}
2947+
2948+
// TenantSettings implements the kvpb.DRPCTenantServiceServer interface.
2949+
func (n *tenantServiceServer) TenantSettings(
2950+
args *kvpb.TenantSettingsRequest, stream kvpb.DRPCTenantService_TenantSettingsStream,
2951+
) error {
2952+
return (*Node)(n).tenantSettings(args, stream)
2953+
}
2954+
2955+
// RangeLookup implements the kvpb.DRPCTenantServiceServer interface.
2956+
func (n *tenantServiceServer) RangeLookup(
2957+
ctx context.Context, req *kvpb.RangeLookupRequest,
2958+
) (*kvpb.RangeLookupResponse, error) {
2959+
return (*Node)(n).RangeLookup(ctx, req)
2960+
}
2961+
2962+
// GetRangeDescriptors implements the kvpb.DRPCTenantServiceServer interface.
2963+
func (n *tenantServiceServer) GetRangeDescriptors(
2964+
args *kvpb.GetRangeDescriptorsRequest, stream kvpb.DRPCTenantService_GetRangeDescriptorsStream,
2965+
) error {
2966+
return (*Node)(n).getRangeDescriptors(args, stream)
2967+
}

pkg/server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
986986
if err := kvpb.DRPCRegisterKVBatch(drpcServer, node.AsDRPCKVBatchServer()); err != nil {
987987
return nil, err
988988
}
989+
if err := kvpb.DRPCRegisterTenantService(drpcServer, node.AsDRPCTenantServiceServer()); err != nil {
990+
return nil, err
991+
}
989992
kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer)
990993
if err := kvserver.DRPCRegisterPerReplica(drpcServer, node.perReplicaServer); err != nil {
991994
return nil, err

0 commit comments

Comments
 (0)