Skip to content

Commit 0ae584a

Browse files
craig[bot]shubhamdhama
andcommitted
Merge #146926
146926: kvserver: register `MultiRaft` service with DRPC server r=cthumuluru-crdb a=shubhamdhama Enable MultiRaft service on the DRPC server in addition to gRPC. Controlled by `rpc.experimental_drpc.enabled` (off by default). To enable this change, we used the interfaces introduced in cockroachdb/drpc#2. For example, for server-side stream implementations, we used `RPCMultiRaft_RaftMessageBatchStream`, which is the common interface between `DRPCMultiRaft_RaftMessageBatchStream` (for DRPC) and `MultiRaft_RaftMessageBatchServer ` (for gRPC). Note: This only registers the service; the client is not updated to use the DRPC client, so this service will not have any functional effect. Fixes: #146473 Epic: CRDB-48925 Release note: None Co-authored-by: Shubham Dhama <[email protected]>
2 parents c7d1e2f + 4ad0339 commit 0ae584a

File tree

8 files changed

+73
-9
lines changed

8 files changed

+73
-9
lines changed

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ go_library(
262262
"@com_github_prometheus_client_model//go",
263263
"@com_github_raduberinde_btreemap//:btreemap",
264264
"@io_opentelemetry_go_otel//attribute",
265+
"@io_storj_drpc//:drpc",
265266
"@org_golang_google_grpc//:grpc",
266267
"@org_golang_x_time//rate",
267268
],

pkg/kv/kvserver/client_merge_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2482,6 +2482,7 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
24822482
nodedialer.New(tc.Servers[0].RPCContext(),
24832483
gossip.AddressResolver(tc.Servers[0].GossipI().(*gossip.Gossip))),
24842484
nil, /* grpcServer */
2485+
nil, /* drpcServer */
24852486
(*node_rac2.AdmittedPiggybacker)(nil),
24862487
nil, /* PiggybackedAdmittedResponseScheduler */
24872488
nil, /* knobs */

pkg/kv/kvserver/client_raft_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3436,6 +3436,7 @@ func TestReplicaGCRace(t *testing.T) {
34363436
tc.Servers[0].Clock(),
34373437
nodedialer.New(tc.Servers[0].RPCContext(), gossip.AddressResolver(fromStore.Gossip())),
34383438
nil, /* grpcServer */
3439+
nil, /* drpcServer */
34393440
(*node_rac2.AdmittedPiggybacker)(nil),
34403441
nil, /* PiggybackedAdmittedResponseScheduler */
34413442
nil, /* knobs */
@@ -3828,6 +3829,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
38283829
nodedialer.New(tc.Servers[0].RPCContext(),
38293830
gossip.AddressResolver(tc.GetFirstStoreFromServer(t, 0).Gossip())),
38303831
nil, /* grpcServer */
3832+
nil, /* drpcServer */
38313833
(*node_rac2.AdmittedPiggybacker)(nil),
38323834
nil, /* PiggybackedAdmittedResponseScheduler */
38333835
nil, /* knobs */

pkg/kv/kvserver/raft_transport.go

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3737
"github.com/cockroachdb/errors"
3838
"google.golang.org/grpc"
39+
drpc "storj.io/drpc"
3940
)
4041

4142
const (
@@ -78,7 +79,7 @@ type RaftMessageResponseStream interface {
7879
// Send. Note that the default implementation of grpc.Stream for server
7980
// responses (grpc.serverStream) is not safe for concurrent calls to Send.
8081
type lockedRaftMessageResponseStream struct {
81-
wrapped MultiRaft_RaftMessageBatchServer
82+
wrapped RPCMultiRaft_RaftMessageBatchStream
8283
sendMu syncutil.Mutex
8384
}
8485

@@ -235,9 +236,7 @@ func NewDummyRaftTransport(
235236
resolver := func(roachpb.NodeID) (net.Addr, roachpb.Locality, error) {
236237
return nil, roachpb.Locality{}, errors.New("dummy resolver")
237238
}
238-
return NewRaftTransport(ambient, st, nil, clock, nodedialer.New(nil, resolver), nil,
239-
nil, nil, nil,
240-
)
239+
return NewRaftTransport(ambient, st, nil, clock, nodedialer.New(nil, resolver), nil, nil, nil, nil, nil)
241240
}
242241

243242
// NewRaftTransport creates a new RaftTransport.
@@ -248,6 +247,7 @@ func NewRaftTransport(
248247
clock *hlc.Clock,
249248
dialer *nodedialer.Dialer,
250249
grpcServer *grpc.Server,
250+
drpcMux drpc.Mux,
251251
piggybackReader node_rac2.PiggybackMsgReader,
252252
piggybackedResponseScheduler PiggybackedAdmittedResponseScheduler,
253253
knobs *RaftTransportTestingKnobs,
@@ -271,6 +271,9 @@ func NewRaftTransport(
271271
if grpcServer != nil {
272272
RegisterMultiRaftServer(grpcServer, t)
273273
}
274+
if drpcMux != nil {
275+
_ = DRPCRegisterMultiRaft(drpcMux, t.AsDRPCServer())
276+
}
274277
return t
275278
}
276279

@@ -375,8 +378,29 @@ func newRaftMessageResponse(
375378
return resp
376379
}
377380

381+
type drpcRaftTransport RaftTransport
382+
383+
// AsDRPCServer returns the DRPC server implementation for the Raft service.
384+
func (t *RaftTransport) AsDRPCServer() DRPCMultiRaftServer {
385+
return (*drpcRaftTransport)(t)
386+
}
387+
388+
// RaftMessageBatch proxies the incoming requests to the listening server interface.
389+
func (t *drpcRaftTransport) RaftMessageBatch(
390+
stream DRPCMultiRaft_RaftMessageBatchStream,
391+
) (lastErr error) {
392+
return (*RaftTransport)(t).raftMessageBatch(stream)
393+
}
394+
378395
// RaftMessageBatch proxies the incoming requests to the listening server interface.
379396
func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer) (lastErr error) {
397+
return t.raftMessageBatch(stream)
398+
}
399+
400+
// raftMessageBatch is the shared implementation for RaftMessageBatch for both gRPC and DRPC.
401+
func (t *RaftTransport) raftMessageBatch(
402+
stream RPCMultiRaft_RaftMessageBatchStream,
403+
) (lastErr error) {
380404
errCh := make(chan error, 1)
381405

382406
// Node stopping error is caught below in the select.
@@ -445,10 +469,25 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
445469
}
446470
}
447471

472+
// DelegateRaftSnapshot handles incoming delegated snapshot requests and passes
473+
// the request to pass off to the sender store. Errors during the snapshots
474+
// process are sent back as a response.
475+
func (t *drpcRaftTransport) DelegateRaftSnapshot(
476+
stream DRPCMultiRaft_DelegateRaftSnapshotStream,
477+
) error {
478+
return (*RaftTransport)(t).delegateRaftSnapshot(stream)
479+
}
480+
448481
// DelegateRaftSnapshot handles incoming delegated snapshot requests and passes
449482
// the request to pass off to the sender store. Errors during the snapshots
450483
// process are sent back as a response.
451484
func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapshotServer) error {
485+
return t.delegateRaftSnapshot(stream)
486+
}
487+
488+
// delegateRaftSnapshot is the shared implementation for DelegateRaftSnapshot
489+
// for both gRPC and DRPC.
490+
func (t *RaftTransport) delegateRaftSnapshot(stream RPCMultiRaft_DelegateRaftSnapshotStream) error {
452491
ctx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context())
453492
defer cancel()
454493
req, err := stream.Recv()
@@ -495,8 +534,18 @@ func (t *RaftTransport) InternalDelegateRaftSnapshot(
495534
return incomingMessageHandler.HandleDelegatedSnapshot(ctx, req)
496535
}
497536

537+
// RaftSnapshot handles incoming streaming snapshot requests.
538+
func (t *drpcRaftTransport) RaftSnapshot(stream DRPCMultiRaft_RaftSnapshotStream) error {
539+
return (*RaftTransport)(t).raftSnapshot(stream)
540+
}
541+
498542
// RaftSnapshot handles incoming streaming snapshot requests.
499543
func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error {
544+
return t.raftSnapshot(stream)
545+
}
546+
547+
// raftSnapshot is the shared implementation for RaftSnapshot for both gRPC and DRPC.
548+
func (t *RaftTransport) raftSnapshot(stream RPCMultiRaft_RaftSnapshotStream) error {
500549
ctx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context())
501550
defer cancel()
502551
req, err := stream.Recv()
@@ -548,7 +597,7 @@ func (t *RaftTransport) StopOutgoingMessage(storeID roachpb.StoreID) {
548597
// lost and a new instance of processQueue will be started by the next message
549598
// to be sent.
550599
func (t *RaftTransport) processQueue(
551-
q *raftSendQueue, stream MultiRaft_RaftMessageBatchClient, _ rpcbase.ConnectionClass,
600+
q *raftSendQueue, stream RPCMultiRaft_RaftMessageBatchClient, _ rpcbase.ConnectionClass,
552601
) error {
553602
errCh := make(chan error, 1)
554603

pkg/kv/kvserver/raft_transport_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,10 @@ func (rttc *raftTransportTestContext) AddNodeWithoutGossip(
182182
manual := hlc.NewHybridManualClock()
183183
clock := hlc.NewClockForTesting(manual)
184184
rttc.clocks[nodeID] = clockWithManualSource{manual: manual, clock: clock}
185-
grpcServer, err := rpc.NewServer(context.Background(), rttc.nodeRPCContext)
185+
ctx := context.Background()
186+
grpcServer, err := rpc.NewServer(ctx, rttc.nodeRPCContext)
187+
require.NoError(rttc.t, err)
188+
drpcServer, err := rpc.NewDRPCServer(ctx, rttc.nodeRPCContext)
186189
require.NoError(rttc.t, err)
187190
transport := kvserver.NewRaftTransport(
188191
log.MakeTestingAmbientCtxWithNewTracer(),
@@ -191,6 +194,7 @@ func (rttc *raftTransportTestContext) AddNodeWithoutGossip(
191194
clock,
192195
nodedialer.New(rttc.nodeRPCContext, gossip.AddressResolver(rttc.gossip)),
193196
grpcServer,
197+
drpcServer,
194198
piggybacker,
195199
piggybackedResponseScheduler,
196200
knobs,

pkg/kv/kvserver/raft_transport_unit_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ func TestRaftTransportStartNewQueue(t *testing.T) {
5353

5454
grpcServer, err := rpc.NewServer(ctx, rpcC)
5555
require.NoError(t, err)
56+
drpcServer, err := rpc.NewDRPCServer(ctx, rpcC)
57+
require.NoError(t, err)
5658
// RegisterMultiRaftServer(grpcServer, mrs)
5759

5860
var addr net.Addr
@@ -71,6 +73,7 @@ func TestRaftTransportStartNewQueue(t *testing.T) {
7173
hlc.NewClockForTesting(nil),
7274
nodedialer.New(rpcC, resolver),
7375
grpcServer,
76+
drpcServer,
7477
(*node_rac2.AdmittedPiggybacker)(nil),
7578
nil, /* PiggybackedAdmittedResponseScheduler */
7679
nil, /* knobs */

pkg/kv/kvserver/store_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ func createTestStoreWithoutStart(
186186
}
187187
rpcContext := rpc.NewContext(ctx, rpcOpts)
188188
stopper.SetTracer(cfg.AmbientCtx.Tracer)
189-
server, err := rpc.NewServer(ctx, rpcContext) // never started
189+
grpcServer, err := rpc.NewServer(ctx, rpcContext) // never started
190+
require.NoError(t, err)
191+
drpcServer, err := rpc.NewDRPCServer(ctx, rpcContext) // never started
190192
require.NoError(t, err)
191193

192194
// Some tests inject their own Gossip and StorePool, via
@@ -235,7 +237,8 @@ func createTestStoreWithoutStart(
235237
stopper,
236238
cfg.Clock,
237239
cfg.NodeDialer,
238-
server,
240+
grpcServer,
241+
drpcServer,
239242
(*node_rac2.AdmittedPiggybacker)(nil),
240243
nil, /* PiggybackedAdmittedResponseScheduler */
241244
nil, /* knobs */
@@ -246,7 +249,7 @@ func createTestStoreWithoutStart(
246249
supportGracePeriod := rpcContext.StoreLivenessWithdrawalGracePeriod()
247250
options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod)
248251
transport := storeliveness.NewTransport(
249-
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, server, nil, /* knobs */
252+
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, nil, /* knobs */
250253
)
251254
knobs := cfg.TestingKnobs.StoreLivenessKnobs
252255
cfg.StoreLiveness = storeliveness.NewNodeContainer(stopper, options, transport, knobs)

pkg/server/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
645645
clock,
646646
kvNodeDialer,
647647
grpcServer.Server,
648+
drpcServer.DRPCServer,
648649
admittedPiggybacker,
649650
storesForRACv2,
650651
raftTransportKnobs,

0 commit comments

Comments
 (0)