Skip to content

Commit 9af31f0

Browse files
*: create DRPC RPC clients conditionally
Create DRPC RPC clients when DRPC is enabled. Although DRPC is currently disabled by default, it will be eventually controlled via `rpc.experimental_drpc.enabled` cluster setting. Epic: CRDB-48923 Fixes: none Release note: none
1 parent 79e136a commit 9af31f0

File tree

27 files changed

+227
-51
lines changed

27 files changed

+227
-51
lines changed

pkg/acceptance/localcluster/cluster.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,11 @@ func (n *Node) StatusClient(ctx context.Context) serverpb.RPCStatusClient {
485485
}
486486
return serverpb.NewGRPCStatusClientAdapter(conn)
487487
}
488-
return nil // This should never happen
488+
conn, err := n.rpcCtx.DRPCUnvalidatedDial(n.RPCAddr(), roachpb.Locality{}).Connect(ctx)
489+
if err != nil {
490+
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
491+
}
492+
return serverpb.NewDRPCStatusClientAdapter(conn)
489493
}
490494

491495
func (n *Node) logDir() string {

pkg/blobs/blobspb/rpc_clients.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,9 @@ func DialBlobClient(
2525
}
2626
return NewGRPCBlobClientAdapter(conn), nil
2727
}
28-
return nil, nil
28+
conn, err := nd.DRPCDial(ctx, nodeID, class)
29+
if err != nil {
30+
return nil, err
31+
}
32+
return NewDRPCBlobClientAdapter(conn), nil
2933
}

pkg/cli/rpc_clients.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,12 @@ func newClientConn(ctx context.Context, cfg server.Config) (rpcConn, func(), err
107107
return nil, nil, errors.Wrap(err, "failed to connect to the node")
108108
}
109109
return &grpcConn{conn: cc}, finish, nil
110-
} else {
111-
dc, finish, err := rpc.NewDRPCClientConn(ctx, ccfg)
112-
if err != nil {
113-
return nil, nil, errors.Wrap(err, "failed to connect to the node")
114-
}
115-
return &drpcConn{conn: dc}, finish, nil
116110
}
111+
dc, finish, err := rpc.NewDRPCClientConn(ctx, ccfg)
112+
if err != nil {
113+
return nil, nil, errors.Wrap(err, "failed to connect to the node")
114+
}
115+
return &drpcConn{conn: dc}, finish, nil
117116
}
118117

119118
// dialAdminClient dials a client connection and returns an AdminClient and a

pkg/gossip/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ go_library(
4141
"@com_github_cockroachdb_errors//:errors",
4242
"@com_github_cockroachdb_logtags//:logtags",
4343
"@com_github_cockroachdb_redact//:redact",
44+
"@io_storj_drpc//:drpc",
4445
"@org_golang_google_grpc//:grpc",
4546
],
4647
)

pkg/gossip/client.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2323
"github.com/cockroachdb/errors"
2424
"google.golang.org/grpc"
25+
drpc "storj.io/drpc"
2526
)
2627

2728
// client is a client-side RPC connection to a gossip peer node.
@@ -415,6 +416,21 @@ func (c *client) dial(ctx context.Context, rpcCtx *rpc.Context) (*grpc.ClientCon
415416
return conn.Connect(ctx)
416417
}
417418

419+
// dials the peer node and returns a DRPC connection to the peer node.
420+
func (c *client) drpcDial(ctx context.Context, rpcCtx *rpc.Context) (drpc.Conn, error) {
421+
var conn *rpc.DRPCConnection
422+
if c.peerID != 0 {
423+
conn = rpcCtx.DRPCDialNode(c.addr.String(), c.peerID, c.locality, rpcbase.SystemClass)
424+
} else {
425+
// TODO(baptist): Use this as a temporary connection for getting
426+
// onto gossip and then replace with a validated connection.
427+
log.Infof(ctx, "unvalidated bootstrap gossip dial to %s", c.addr)
428+
conn = rpcCtx.DRPCUnvalidatedDial(c.addr.String(), c.locality)
429+
}
430+
431+
return conn.Connect(ctx)
432+
}
433+
418434
// dialGossipClient establishes a DRPC connection if enabled; otherwise,
419435
// it falls back to gRPC. The established connection is used to create a
420436
// RPCGossipClient.
@@ -428,5 +444,9 @@ func (c *client) dialGossipClient(
428444
}
429445
return NewGRPCGossipClientAdapter(conn), nil
430446
}
431-
return nil, nil
447+
conn, err := c.drpcDial(ctx, rpcCtx)
448+
if err != nil {
449+
return nil, err
450+
}
451+
return NewDRPCGossipClientAdapter(conn), nil
432452
}

pkg/keyvisualizer/keyvispb/rpc_clients.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,9 @@ func DialKeyVisualizerClient(
2525
}
2626
return NewGRPCKeyVisualizerClientAdapter(conn), nil
2727
}
28-
return nil, nil
28+
conn, err := nd.DRPCDial(ctx, nodeID, class)
29+
if err != nil {
30+
return nil, err
31+
}
32+
return NewDRPCKeyVisualizerClientAdapter(conn), nil
2933
}

pkg/kv/kvclient/kvtenant/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ go_library(
4646
"//pkg/util/uuid",
4747
"@com_github_cockroachdb_errors//:errors",
4848
"@com_github_cockroachdb_errors//errorspb",
49+
"@io_storj_drpc//:drpc",
4950
"@org_golang_google_grpc//:grpc",
5051
"@org_golang_google_grpc//codes",
5152
"@org_golang_google_grpc//status",

pkg/kv/kvclient/kvtenant/connector.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
"google.golang.org/grpc"
5252
"google.golang.org/grpc/codes"
5353
"google.golang.org/grpc/status"
54+
"storj.io/drpc"
5455
)
5556

5657
func init() {
@@ -993,6 +994,19 @@ func (c *connector) dialAddrs(ctx context.Context) (*client, error) {
993994
RPCTimeSeriesClient: tspb.NewGRPCTimeSeriesClientAdapter(conn),
994995
}, nil
995996
}
997+
conn, err := c.drpcDialAddr(ctx, addr)
998+
if err != nil {
999+
log.Warningf(ctx, "error dialing tenant KV address %s: %v", addr, err)
1000+
continue
1001+
}
1002+
return &client{
1003+
RPCTenantServiceClient: kvpb.NewDRPCTenantServiceClientAdapter(conn),
1004+
RPCTenantSpanConfigClient: kvpb.NewDRPCTenantSpanConfigClientAdapter(conn),
1005+
RPCTenantUsageClient: kvpb.NewDRPCTenantUsageClientAdapter(conn),
1006+
RPCStatusClient: serverpb.NewDRPCStatusClientAdapter(conn),
1007+
RPCAdminClient: serverpb.NewDRPCAdminClientAdapter(conn),
1008+
RPCTimeSeriesClient: tspb.NewDRPCTimeSeriesClientAdapter(conn),
1009+
}, nil
9961010
}
9971011
}
9981012
return nil, errors.Wrap(ctx.Err(), "dial addrs")
@@ -1009,6 +1023,17 @@ func (c *connector) dialAddr(ctx context.Context, addr string) (conn *grpc.Clien
10091023
return conn, err
10101024
}
10111025

1026+
func (c *connector) drpcDialAddr(ctx context.Context, addr string) (conn drpc.Conn, err error) {
1027+
if c.rpcDialTimeout == 0 {
1028+
return c.rpcContext.DRPCUnvalidatedDial(addr, roachpb.Locality{}).Connect(ctx)
1029+
}
1030+
err = timeutil.RunWithTimeout(ctx, "dial addr", c.rpcDialTimeout, func(ctx context.Context) error {
1031+
conn, err = c.rpcContext.DRPCUnvalidatedDial(addr, roachpb.Locality{}).Connect(ctx)
1032+
return err
1033+
})
1034+
return conn, err
1035+
}
1036+
10121037
func (c *connector) tryForgetClient(ctx context.Context, client *client) {
10131038
if ctx.Err() != nil {
10141039
// Error (may be) due to context. Don't forget client.

pkg/kv/kvserver/closedts/ctpb/rpc_clients.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,9 @@ func DialSideTransportClient(
2525
}
2626
return NewGRPCSideTransportClientAdapter(conn), nil
2727
}
28-
return nil, nil
28+
conn, err := nd.DRPCDial(ctx, nodeID, class)
29+
if err != nil {
30+
return nil, err
31+
}
32+
return NewDRPCSideTransportClientAdapter(conn), nil
2933
}

pkg/kv/kvserver/rpc_clients.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ func DialMultiRaftClient(
2525
}
2626
return NewGRPCMultiRaftClientAdapter(conn), nil
2727
}
28-
return nil, nil
28+
conn, err := nd.DRPCDial(ctx, nodeID, class)
29+
if err != nil {
30+
return nil, err
31+
}
32+
return NewDRPCMultiRaftClientAdapter(conn), nil
2933
}
3034

3135
// DialPerReplicaClient establishes a DRPC connection if enabled; otherwise,
@@ -41,7 +45,11 @@ func DialPerReplicaClient(
4145
}
4246
return NewGRPCPerReplicaClientAdapter(conn), nil
4347
}
44-
return nil, nil
48+
conn, err := nd.DRPCDial(ctx, nodeID, class)
49+
if err != nil {
50+
return nil, err
51+
}
52+
return NewDRPCPerReplicaClientAdapter(conn), nil
4553
}
4654

4755
// DialPerStoreClient establishes a DRPC connection if enabled; otherwise,
@@ -57,5 +65,9 @@ func DialPerStoreClient(
5765
}
5866
return NewGRPCPerStoreClientAdapter(conn), nil
5967
}
60-
return nil, nil
68+
conn, err := nd.DRPCDial(ctx, nodeID, class)
69+
if err != nil {
70+
return nil, err
71+
}
72+
return NewDRPCPerStoreClientAdapter(conn), nil
6173
}

0 commit comments

Comments
 (0)