@@ -36,6 +36,7 @@ import (
36
36
"github.com/cockroachdb/cockroach/pkg/util/uuid"
37
37
"github.com/cockroachdb/errors"
38
38
"google.golang.org/grpc"
39
+ drpc "storj.io/drpc"
39
40
)
40
41
41
42
const (
@@ -78,7 +79,7 @@ type RaftMessageResponseStream interface {
78
79
// Send. Note that the default implementation of grpc.Stream for server
79
80
// responses (grpc.serverStream) is not safe for concurrent calls to Send.
80
81
type lockedRaftMessageResponseStream struct {
81
- wrapped MultiRaft_RaftMessageBatchServer
82
+ wrapped RPCMultiRaft_RaftMessageBatchStream
82
83
sendMu syncutil.Mutex
83
84
}
84
85
@@ -235,9 +236,7 @@ func NewDummyRaftTransport(
235
236
resolver := func (roachpb.NodeID ) (net.Addr , roachpb.Locality , error ) {
236
237
return nil , roachpb.Locality {}, errors .New ("dummy resolver" )
237
238
}
238
- return NewRaftTransport (ambient , st , nil , clock , nodedialer .New (nil , resolver ), nil ,
239
- nil , nil , nil ,
240
- )
239
+ return NewRaftTransport (ambient , st , nil , clock , nodedialer .New (nil , resolver ), nil , nil , nil , nil , nil )
241
240
}
242
241
243
242
// NewRaftTransport creates a new RaftTransport.
@@ -248,6 +247,7 @@ func NewRaftTransport(
248
247
clock * hlc.Clock ,
249
248
dialer * nodedialer.Dialer ,
250
249
grpcServer * grpc.Server ,
250
+ drpcMux drpc.Mux ,
251
251
piggybackReader node_rac2.PiggybackMsgReader ,
252
252
piggybackedResponseScheduler PiggybackedAdmittedResponseScheduler ,
253
253
knobs * RaftTransportTestingKnobs ,
@@ -271,6 +271,9 @@ func NewRaftTransport(
271
271
if grpcServer != nil {
272
272
RegisterMultiRaftServer (grpcServer , t )
273
273
}
274
+ if drpcMux != nil {
275
+ _ = DRPCRegisterMultiRaft (drpcMux , t .AsDRPCServer ())
276
+ }
274
277
return t
275
278
}
276
279
@@ -375,8 +378,29 @@ func newRaftMessageResponse(
375
378
return resp
376
379
}
377
380
381
+ type drpcRaftTransport RaftTransport
382
+
383
+ // AsDRPCServer returns the DRPC server implementation for the Raft service.
384
+ func (t * RaftTransport ) AsDRPCServer () DRPCMultiRaftServer {
385
+ return (* drpcRaftTransport )(t )
386
+ }
387
+
388
+ // RaftMessageBatch proxies the incoming requests to the listening server interface.
389
+ func (t * drpcRaftTransport ) RaftMessageBatch (
390
+ stream DRPCMultiRaft_RaftMessageBatchStream ,
391
+ ) (lastErr error ) {
392
+ return (* RaftTransport )(t ).raftMessageBatch (stream )
393
+ }
394
+
378
395
// RaftMessageBatch proxies the incoming requests to the listening server interface.
379
396
func (t * RaftTransport ) RaftMessageBatch (stream MultiRaft_RaftMessageBatchServer ) (lastErr error ) {
397
+ return t .raftMessageBatch (stream )
398
+ }
399
+
400
+ // raftMessageBatch is the shared implementation for RaftMessageBatch for both gRPC and DRPC.
401
+ func (t * RaftTransport ) raftMessageBatch (
402
+ stream RPCMultiRaft_RaftMessageBatchStream ,
403
+ ) (lastErr error ) {
380
404
errCh := make (chan error , 1 )
381
405
382
406
// Node stopping error is caught below in the select.
@@ -444,10 +468,25 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer
444
468
}
445
469
}
446
470
471
+ // DelegateRaftSnapshot handles incoming delegated snapshot requests and passes
472
+ // the request to pass off to the sender store. Errors during the snapshots
473
+ // process are sent back as a response.
474
+ func (t * drpcRaftTransport ) DelegateRaftSnapshot (
475
+ stream DRPCMultiRaft_DelegateRaftSnapshotStream ,
476
+ ) error {
477
+ return (* RaftTransport )(t ).delegateRaftSnapshot (stream )
478
+ }
479
+
447
480
// DelegateRaftSnapshot handles incoming delegated snapshot requests and passes
448
481
// the request to pass off to the sender store. Errors during the snapshots
449
482
// process are sent back as a response.
450
483
func (t * RaftTransport ) DelegateRaftSnapshot (stream MultiRaft_DelegateRaftSnapshotServer ) error {
484
+ return t .delegateRaftSnapshot (stream )
485
+ }
486
+
487
+ // delegateRaftSnapshot is the shared implementation for DelegateRaftSnapshot
488
+ // for both gRPC and DRPC.
489
+ func (t * RaftTransport ) delegateRaftSnapshot (stream RPCMultiRaft_DelegateRaftSnapshotStream ) error {
451
490
ctx , cancel := t .stopper .WithCancelOnQuiesce (stream .Context ())
452
491
defer cancel ()
453
492
req , err := stream .Recv ()
@@ -494,8 +533,18 @@ func (t *RaftTransport) InternalDelegateRaftSnapshot(
494
533
return incomingMessageHandler .HandleDelegatedSnapshot (ctx , req )
495
534
}
496
535
536
+ // RaftSnapshot handles incoming streaming snapshot requests.
537
+ func (t * drpcRaftTransport ) RaftSnapshot (stream DRPCMultiRaft_RaftSnapshotStream ) error {
538
+ return (* RaftTransport )(t ).raftSnapshot (stream )
539
+ }
540
+
497
541
// RaftSnapshot handles incoming streaming snapshot requests.
498
542
func (t * RaftTransport ) RaftSnapshot (stream MultiRaft_RaftSnapshotServer ) error {
543
+ return t .raftSnapshot (stream )
544
+ }
545
+
546
+ // raftSnapshot is the shared implementation for RaftSnapshot for both gRPC and DRPC.
547
+ func (t * RaftTransport ) raftSnapshot (stream RPCMultiRaft_RaftSnapshotStream ) error {
499
548
ctx , cancel := t .stopper .WithCancelOnQuiesce (stream .Context ())
500
549
defer cancel ()
501
550
req , err := stream .Recv ()
@@ -547,7 +596,7 @@ func (t *RaftTransport) StopOutgoingMessage(storeID roachpb.StoreID) {
547
596
// lost and a new instance of processQueue will be started by the next message
548
597
// to be sent.
549
598
func (t * RaftTransport ) processQueue (
550
- q * raftSendQueue , stream MultiRaft_RaftMessageBatchClient , _ rpcbase.ConnectionClass ,
599
+ q * raftSendQueue , stream RPCMultiRaft_RaftMessageBatchClient , _ rpcbase.ConnectionClass ,
551
600
) error {
552
601
errCh := make (chan error , 1 )
553
602
0 commit comments