Skip to content

Commit eab29dd

Browse files
kv: use generic dialer to create RPC client
Changes in this PR are a followup to #152948. Epic: CRDB-48934 Informs: None Release note: None
1 parent fe3f346 commit eab29dd

File tree

5 files changed

+23
-32
lines changed

5 files changed

+23
-32
lines changed

pkg/kv/kvserver/closedts/ctpb/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"//pkg/kv/kvpb",
1616
"//pkg/roachpb",
1717
"//pkg/rpc/rpcbase",
18+
"//pkg/settings/cluster",
1819
"//pkg/util/timeutil",
1920
],
2021
)

pkg/kv/kvserver/closedts/ctpb/rpc_clients.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,18 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/roachpb"
1212
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
13+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1314
)
1415

1516
// DialSideTransportClient establishes a DRPC connection if enabled; otherwise,
1617
// it falls back to gRPC. The established connection is used to create a
1718
// RPCSideTransportClient.
1819
func DialSideTransportClient(
19-
nd rpcbase.NodeDialer, ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
20+
nd rpcbase.NodeDialer,
21+
ctx context.Context,
22+
nodeID roachpb.NodeID,
23+
class rpcbase.ConnectionClass,
24+
cs *cluster.Settings,
2025
) (RPCSideTransportClient, error) {
21-
if !rpcbase.TODODRPC {
22-
conn, err := nd.Dial(ctx, nodeID, class)
23-
if err != nil {
24-
return nil, err
25-
}
26-
return NewGRPCSideTransportClientAdapter(conn), nil
27-
}
28-
conn, err := nd.DRPCDial(ctx, nodeID, class)
29-
if err != nil {
30-
return nil, err
31-
}
32-
return NewDRPCSideTransportClientAdapter(conn), nil
26+
return rpcbase.DialRPCClient(nd, ctx, nodeID, class, NewGRPCSideTransportClientAdapter, NewDRPCSideTransportClientAdapter, cs)
3327
}

pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ go_library(
3030
"//pkg/util/syncutil",
3131
"//pkg/util/timeutil",
3232
"@com_github_cockroachdb_errors//:errors",
33-
"@io_storj_drpc//:drpc",
34-
"@org_golang_google_grpc//:grpc",
3533
],
3634
)
3735

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ import (
3535
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3636
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3737
"github.com/cockroachdb/errors"
38-
"google.golang.org/grpc"
39-
"storj.io/drpc"
4038
)
4139

4240
// Sender represents the sending-side of the closed timestamps "side-transport".
@@ -687,11 +685,11 @@ type conn interface {
687685
// rpcConnFactory is an implementation of connFactory that establishes
688686
// connections to other nodes using gRPC.
689687
type rpcConnFactory struct {
690-
dialer nodeDialer
688+
dialer rpcbase.NodeDialer
691689
testingKnobs connTestingKnobs
692690
}
693691

694-
func newRPCConnFactory(dialer nodeDialer, testingKnobs connTestingKnobs) connFactory {
692+
func newRPCConnFactory(dialer rpcbase.NodeDialer, testingKnobs connTestingKnobs) connFactory {
695693
return &rpcConnFactory{
696694
dialer: dialer,
697695
testingKnobs: testingKnobs,
@@ -703,12 +701,6 @@ func (f *rpcConnFactory) new(s *Sender, nodeID roachpb.NodeID) conn {
703701
return newRPCConn(f.dialer, s, nodeID, f.testingKnobs)
704702
}
705703

706-
// nodeDialer abstracts *nodedialer.Dialer.
707-
type nodeDialer interface {
708-
Dial(ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass) (_ *grpc.ClientConn, err error)
709-
DRPCDial(ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass) (_ drpc.Conn, err error)
710-
}
711-
712704
// On sending errors, we sleep a bit as to not spin on a tripped
713705
// circuit-breaker in the Dialer.
714706
const sleepOnErr = time.Second
@@ -720,7 +712,7 @@ const sleepOnErr = time.Second
720712
// snapshot before we can resume sending regular messages.
721713
type rpcConn struct {
722714
log.AmbientContext
723-
dialer nodeDialer
715+
dialer rpcbase.NodeDialer
724716
producer *Sender
725717
nodeID roachpb.NodeID
726718
testingKnobs connTestingKnobs
@@ -739,7 +731,7 @@ type rpcConn struct {
739731
}
740732

741733
func newRPCConn(
742-
dialer nodeDialer, producer *Sender, nodeID roachpb.NodeID, testingKnobs connTestingKnobs,
734+
dialer rpcbase.NodeDialer, producer *Sender, nodeID roachpb.NodeID, testingKnobs connTestingKnobs,
743735
) conn {
744736
r := &rpcConn{
745737
dialer: dialer,
@@ -791,7 +783,7 @@ func (r *rpcConn) maybeConnect(ctx context.Context, _ *stop.Stopper) error {
791783
return nil
792784
}
793785

794-
client, err := ctpb.DialSideTransportClient(r.dialer, ctx, r.nodeID, rpcbase.SystemClass)
786+
client, err := ctpb.DialSideTransportClient(r.dialer, ctx, r.nodeID, rpcbase.SystemClass, r.producer.st)
795787
if err != nil {
796788
return err
797789
}

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ type mockDialer struct {
622622
}
623623
}
624624

625-
var _ nodeDialer = &mockDialer{}
625+
var _ rpcbase.NodeDialer = &mockDialer{}
626626

627627
type nodeAddr struct {
628628
nid roachpb.NodeID
@@ -825,7 +825,7 @@ type failingDialer struct {
825825
dialCount int32
826826
}
827827

828-
var _ nodeDialer = &failingDialer{}
828+
var _ rpcbase.NodeDialer = &failingDialer{}
829829

830830
func (f *failingDialer) Dial(
831831
ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
@@ -858,8 +858,14 @@ func TestRPCConnStopOnClose(t *testing.T) {
858858

859859
dialer := &failingDialer{}
860860
factory := newRPCConnFactory(dialer, connTestingKnobs{sleepOnErrOverride: sleepTime})
861-
connection := factory.new(nil, /* sender is not needed as dialer always fails Dial attempts */
862-
roachpb.NodeID(1))
861+
862+
s, stopper := newMockSender(factory)
863+
defer stopper.Stop(ctx)
864+
865+
// While sender is strictly not needed to dial a connection as dialer
866+
// always fails dial attempts, it is needed to check if DRPC is enabled
867+
// or disabled.
868+
connection := factory.new(s, roachpb.NodeID(1))
863869
connection.run(ctx, stopper)
864870

865871
// Wait for first dial attempt for sanity reasons.

0 commit comments

Comments
 (0)