Skip to content

Commit b0b5d89

Browse files
craig[bot]cthumuluru-crdb
andcommitted
Merge #148610
148610: *: add DRPC dial methods to nodedialer r=cthumuluru-crdb a=cthumuluru-crdb This change enables nodedialer to dial either a gRPC or a DRPC connection, depending on `rpc.experimental_drpc.enabled` setting. Epic: CRDB-48923 Fixes: none Release note: none Co-authored-by: Chandra Thumuluru <[email protected]>
2 parents a53b28b + 50dfcb6 commit b0b5d89

File tree

8 files changed

+61
-0
lines changed

8 files changed

+61
-0
lines changed

pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ go_library(
3030
"//pkg/util/syncutil",
3131
"//pkg/util/timeutil",
3232
"@com_github_cockroachdb_errors//:errors",
33+
"@io_storj_drpc//:drpc",
3334
"@org_golang_google_grpc//:grpc",
3435
],
3536
)
@@ -59,6 +60,7 @@ go_test(
5960
"//pkg/util/syncutil",
6061
"@com_github_cockroachdb_errors//:errors",
6162
"@com_github_stretchr_testify//require",
63+
"@io_storj_drpc//:drpc",
6264
"@org_golang_google_grpc//:grpc",
6365
],
6466
)

pkg/kv/kvserver/closedts/sidetransport/sender.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3737
"github.com/cockroachdb/errors"
3838
"google.golang.org/grpc"
39+
"storj.io/drpc"
3940
)
4041

4142
// Sender represents the sending-side of the closed timestamps "side-transport".
@@ -705,6 +706,7 @@ func (f *rpcConnFactory) new(s *Sender, nodeID roachpb.NodeID) conn {
705706
// nodeDialer abstracts *nodedialer.Dialer.
706707
type nodeDialer interface {
707708
Dial(ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass) (_ *grpc.ClientConn, err error)
709+
DRPCDial(ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass) (_ drpc.Conn, err error)
708710
}
709711

710712
// On sending errors, we sleep a bit as to not spin on a tripped

pkg/kv/kvserver/closedts/sidetransport/sender_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/cockroachdb/errors"
3232
"github.com/stretchr/testify/require"
3333
"google.golang.org/grpc"
34+
"storj.io/drpc"
3435
)
3536

3637
// mockReplica is a mock implementation of the Replica interface.
@@ -661,6 +662,12 @@ func (m *mockDialer) Dial(
661662
return c, err
662663
}
663664

665+
func (m *mockDialer) DRPCDial(
666+
ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
667+
) (_ drpc.Conn, _ error) {
668+
return nil, errors.New("DRPCDial unimplemented")
669+
}
670+
664671
func (m *mockDialer) Close() {
665672
m.mu.Lock()
666673
defer m.mu.Unlock()
@@ -827,6 +834,12 @@ func (f *failingDialer) Dial(
827834
return nil, errors.New("failingDialer")
828835
}
829836

837+
func (f *failingDialer) DRPCDial(
838+
ctx context.Context, nodeID roachpb.NodeID, class rpcbase.ConnectionClass,
839+
) (_ drpc.Conn, err error) {
840+
return nil, errors.New("DRPCDial unimplemented")
841+
}
842+
830843
func (f *failingDialer) callCount() int32 {
831844
return atomic.LoadInt32(&f.dialCount)
832845
}

pkg/rpc/rpcbase/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ go_library(
2626
"//pkg/roachpb",
2727
"//pkg/rpc/rpcpb",
2828
"//pkg/util/envutil",
29+
"@io_storj_drpc//:drpc",
2930
"@org_golang_google_grpc//:grpc",
3031
],
3132
)

pkg/rpc/rpcbase/nodedialer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/roachpb"
1212
"google.golang.org/grpc"
13+
"storj.io/drpc"
1314
)
1415

1516
// TODODRPC is a marker to identify each RPC client creation site that needs to
@@ -20,11 +21,13 @@ const TODODRPC = false
2021
// node IDs.
2122
type NodeDialer interface {
2223
Dial(context.Context, roachpb.NodeID, ConnectionClass) (_ *grpc.ClientConn, err error)
24+
DRPCDial(context.Context, roachpb.NodeID, ConnectionClass) (_ drpc.Conn, err error)
2325
}
2426

2527
// NodeDialerNoBreaker interface defines methods for dialing peer nodes using their
2628
// node IDs. This interface is similar to NodeDialer but does not check the
2729
// breaker before dialing.
2830
type NodeDialerNoBreaker interface {
2931
DialNoBreaker(context.Context, roachpb.NodeID, ConnectionClass) (_ *grpc.ClientConn, err error)
32+
DRPCDialNoBreaker(context.Context, roachpb.NodeID, ConnectionClass) (_ drpc.Conn, err error)
3033
}

pkg/server/fanout_clients.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/util/stop"
2828
"github.com/cockroachdb/errors"
2929
"google.golang.org/grpc"
30+
"storj.io/drpc"
3031
)
3132

3233
// serverID is a type that is either a `roachpb.NodeID`
@@ -51,6 +52,11 @@ type ServerIterator interface {
5152
dialNode(
5253
ctx context.Context, serverID serverID,
5354
) (*grpc.ClientConn, error)
55+
// drpcDialNode provides a DRPC connection to the node or SQL instance
56+
// identified by serverID.
57+
drpcDialNode(
58+
ctx context.Context, serverID serverID,
59+
) (drpc.Conn, error)
5460
// getAllNodes returns a map of all nodes in the cluster
5561
// or instances in the tenant with their liveness status.
5662
getAllNodes(
@@ -89,6 +95,12 @@ func (d *nodeDialer) Dial(
8995
return d.si.dialNode(ctx, serverID(nodeID))
9096
}
9197

98+
func (d *nodeDialer) DRPCDial(
99+
ctx context.Context, nodeID roachpb.NodeID, _ rpcbase.ConnectionClass,
100+
) (drpc.Conn, error) {
101+
return d.si.drpcDialNode(ctx, serverID(nodeID))
102+
}
103+
92104
type tenantFanoutClient struct {
93105
sqlServer *SQLServer
94106
rpcCtx *rpc.Context
@@ -157,6 +169,17 @@ func (t *tenantFanoutClient) dialNode(
157169
return t.rpcCtx.GRPCDialPod(instance.InstanceRPCAddr, id, instance.Locality, rpcbase.DefaultClass).Connect(ctx)
158170
}
159171

172+
func (t *tenantFanoutClient) drpcDialNode(
173+
ctx context.Context, serverID serverID,
174+
) (drpc.Conn, error) {
175+
id := base.SQLInstanceID(serverID)
176+
instance, err := t.sqlServer.sqlInstanceReader.GetInstance(ctx, id)
177+
if err != nil {
178+
return nil, err
179+
}
180+
return t.rpcCtx.DRPCDialPod(instance.InstanceRPCAddr, id, instance.Locality, rpcbase.DefaultClass).Connect(ctx)
181+
}
182+
160183
func (t *tenantFanoutClient) getAllNodes(
161184
ctx context.Context,
162185
) (map[serverID]livenesspb.NodeLivenessStatus, error) {
@@ -238,6 +261,15 @@ func (k kvFanoutClient) dialNode(ctx context.Context, serverID serverID) (*grpc.
238261
return k.rpcCtx.GRPCDialNode(addr.String(), id, locality, rpcbase.DefaultClass).Connect(ctx)
239262
}
240263

264+
func (k kvFanoutClient) drpcDialNode(ctx context.Context, serverID serverID) (drpc.Conn, error) {
265+
id := roachpb.NodeID(serverID)
266+
addr, locality, err := k.gossip.GetNodeIDAddress(id)
267+
if err != nil {
268+
return nil, err
269+
}
270+
return k.rpcCtx.DRPCDialNode(addr.String(), id, locality, rpcbase.DefaultClass).Connect(ctx)
271+
}
272+
241273
func (k kvFanoutClient) listNodes(ctx context.Context) (*serverpb.NodesResponse, error) {
242274
ctx = authserver.ForwardSQLIdentityThroughRPCCalls(ctx)
243275
ctx = k.ambientCtx.AnnotateCtx(ctx)

pkg/upgrade/upgradecluster/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ go_test(
5353
"//pkg/util/leaktest",
5454
"//pkg/util/retry",
5555
"//pkg/util/syncutil",
56+
"@io_storj_drpc//:drpc",
5657
"@org_golang_google_grpc//:grpc",
5758
],
5859
)

pkg/upgrade/upgradecluster/helper_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/util/retry"
2020
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2121
"google.golang.org/grpc"
22+
"storj.io/drpc"
2223
)
2324

2425
type NoopDialer struct{}
@@ -29,6 +30,12 @@ func (n NoopDialer) Dial(
2930
return nil, nil
3031
}
3132

33+
func (n NoopDialer) DRPCDial(
34+
ctx context.Context, id roachpb.NodeID, class rpcbase.ConnectionClass,
35+
) (drpc.Conn, error) {
36+
return nil, nil
37+
}
38+
3239
var _ rpcbase.NodeDialer = NoopDialer{}
3340

3441
func TestHelperEveryNode(t *testing.T) {

0 commit comments

Comments
 (0)