Skip to content

Commit 67d56fa

Browse files
kv,server: extract RangeFeed and register with DRPC server
This is a follow-up to #145195, where we extracted KVBatch from the Internal service. Here, we do the same with RangeFeed. The TL;DR is that smaller services are easier to maintain and can be more smoothly migrated to DRPC. We also enable this service on the DRPC server. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None
1 parent fafb5e5 commit 67d56fa

File tree

3 files changed

+24
-1
lines changed

3 files changed

+24
-1
lines changed

pkg/kv/kvpb/api.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3883,3 +3883,8 @@ message ScanStats {
38833883
// UsedFollowerRead indicates whether at least some reads were served by the
38843884
// follower replicas.
38853885
message UsedFollowerRead {}
3886+
3887+
// RangeFeed service implemented by nodes for KV API requests.
3888+
service RangeFeed {
3889+
rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {}
3890+
}

pkg/server/node.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1918,6 +1918,16 @@ func (n *kvBatchServer) BatchStream(stream kvpb.DRPCKVBatch_BatchStreamStream) e
19181918
return (*Node)(n).batchStreamImpl(stream)
19191919
}
19201920

1921+
type kvRangeFeedServer Node
1922+
1923+
func (n *Node) AsDRPCRangeFeedServer() kvpb.DRPCRangeFeedServer {
1924+
return (*kvRangeFeedServer)(n)
1925+
}
1926+
1927+
func (n *kvRangeFeedServer) MuxRangeFeed(stream kvpb.DRPCRangeFeed_MuxRangeFeedStream) error {
1928+
return (*Node)(n).muxRangeFeed(stream)
1929+
}
1930+
19211931
// spanForRequest is the retval of setupSpanForIncomingRPC. It groups together a
19221932
// few variables needed when finishing an RPC's span.
19231933
//
@@ -2092,7 +2102,7 @@ func (n *Node) RangeLookup(
20922102
// MuxRangeFeedServer (default grpc.Stream) is not safe for concurrent calls to
20932103
// Send.
20942104
type lockedMuxStream struct {
2095-
wrapped kvpb.Internal_MuxRangeFeedServer
2105+
wrapped kvpb.RPCInternal_MuxRangeFeedStream
20962106
sendMu syncutil.Mutex
20972107
metrics *rangefeed.LockedMuxStreamMetrics
20982108
}
@@ -2188,6 +2198,11 @@ func (n *Node) defaultRangefeedConsumerID() int64 {
21882198

21892199
// MuxRangeFeed implements the roachpb.InternalServer interface.
21902200
func (n *Node) MuxRangeFeed(muxStream kvpb.Internal_MuxRangeFeedServer) error {
2201+
return n.muxRangeFeed(muxStream)
2202+
}
2203+
2204+
// MuxRangeFeed implements the roachpb.InternalServer interface.
2205+
func (n *Node) muxRangeFeed(muxStream kvpb.RPCInternal_MuxRangeFeedStream) error {
21912206
lockedMuxStream := &lockedMuxStream{
21922207
wrapped: muxStream,
21932208
metrics: n.metrics.LockedMuxStreamMetrics,

pkg/server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
986986
if err := kvpb.DRPCRegisterKVBatch(drpcServer, node.AsDRPCKVBatchServer()); err != nil {
987987
return nil, err
988988
}
989+
if err := kvpb.DRPCRegisterRangeFeed(drpcServer, node.AsDRPCRangeFeedServer()); err != nil {
990+
return nil, err
991+
}
989992
if err := kvpb.DRPCRegisterTenantService(drpcServer, node.AsDRPCTenantServiceServer()); err != nil {
990993
return nil, err
991994
}

0 commit comments

Comments
 (0)