Skip to content

Commit ca8c174

Browse files
craig[bot]shubhamdhama
andcommitted
Merge #148352
148352: kvserver: register `SideTransport` service with DRPC server r=cthumuluru-crdb a=shubhamdhama Enable the `SideTransport` service on the DRPC server in addition to gRPC. This is controlled by `rpc.experimental_drpc.enabled` (off by default). This change is part of a series and is similar to: #146926 Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Epic: CRDB-48925 Release note: None Co-authored-by: Shubham Dhama <[email protected]>
2 parents 4c2a546 + 7e5b067 commit ca8c174

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

pkg/kv/kvserver/closedts/sidetransport/receiver.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,23 @@ func NewReceiver(
7676
return r
7777
}
7878

79+
func (s *Receiver) AsDRPCServer() ctpb.DRPCSideTransportServer {
80+
return (*drpcReceiver)(s)
81+
}
82+
83+
type drpcReceiver Receiver
84+
85+
// PushUpdates is the streaming RPC handler.
86+
func (s *drpcReceiver) PushUpdates(stream ctpb.DRPCSideTransport_PushUpdatesStream) error {
87+
return (*Receiver)(s).pushUpdates(stream)
88+
}
89+
7990
// PushUpdates is the streaming RPC handler.
8091
func (s *Receiver) PushUpdates(stream ctpb.SideTransport_PushUpdatesServer) error {
92+
return s.pushUpdates(stream)
93+
}
94+
95+
func (s *Receiver) pushUpdates(stream ctpb.RPCSideTransport_PushUpdatesStream) error {
8196
// Create a steam to service this connection. The stream will call back into
8297
// the Receiver through onFirstMsg to register itself once it finds out the
8398
// sender's node id.
@@ -288,7 +303,7 @@ func (r *incomingStream) Run(
288303
ctx context.Context,
289304
stopper *stop.Stopper,
290305
// The gRPC stream with incoming messages.
291-
stream ctpb.SideTransport_PushUpdatesServer,
306+
stream ctpb.RPCSideTransport_PushUpdatesStream,
292307
) error {
293308
// We have to do the stream processing on a separate goroutine because Recv()
294309
// is blocking, with no way to interrupt it other than returning from the RPC

pkg/server/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
995995
return nil, err
996996
}
997997
ctpb.RegisterSideTransportServer(grpcServer.Server, ctReceiver)
998+
if err := ctpb.DRPCRegisterSideTransport(drpcServer, ctReceiver.AsDRPCServer()); err != nil {
999+
return nil, err
1000+
}
9981001

9991002
// Create blob service for inter-node file sharing.
10001003
blobService, err := blobs.NewBlobService(cfg.ExternalIODir)

0 commit comments

Comments
 (0)